You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/28 22:05:13 UTC

[kafka] branch 2.1 updated: MINOR: Improve exception messages in FileChannelRecordBatch (#6068)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new ed08921  MINOR: Improve exception messages in FileChannelRecordBatch (#6068)
ed08921 is described below

commit ed089211f7dff6db66f2cb7dacadc98d70e49b20
Author: Flavien Raynaud <fl...@gmail.com>
AuthorDate: Fri Dec 28 21:58:05 2018 +0000

    MINOR: Improve exception messages in FileChannelRecordBatch (#6068)
    
    Replace `channel` by `fileRecords` in potentially thrown KafkaException
    descriptions when loading/writing `FileChannelRecordBatch`. This makes exception
    messages more readable (channel only shows an object hashcode, fileRecords shows
    the path of the file being read and start/end positions in the file).
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../common/record/AbstractLegacyRecordBatch.java   |  5 ++---
 .../kafka/common/record/DefaultRecordBatch.java    |  5 ++---
 .../kafka/common/record/FileLogInputStream.java    | 24 ++++++++++++++--------
 3 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index cdf731c..1d50a15 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -29,7 +29,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -530,10 +529,10 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
 
         LegacyFileChannelRecordBatch(long offset,
                                      byte magic,
-                                     FileChannel channel,
+                                     FileRecords fileRecords,
                                      int position,
                                      int batchSize) {
-            super(offset, magic, channel, position, batchSize);
+            super(offset, magic, fileRecords, position, batchSize);
         }
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 71e668e..5156c64 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -28,7 +28,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -586,10 +585,10 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
 
         DefaultFileChannelRecordBatch(long offset,
                                       byte magic,
-                                      FileChannel channel,
+                                      FileRecords fileRecords,
                                       int position,
                                       int batchSize) {
-            super(offset, magic, channel, position, batchSize);
+            super(offset, magic, fileRecords, position, batchSize);
         }
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 92e8864..472c7a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -82,9 +82,9 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         final FileChannelRecordBatch batch;
 
         if (magic < RecordBatch.MAGIC_VALUE_V2)
-            batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size);
+            batch = new LegacyFileChannelRecordBatch(offset, magic, fileRecords, position, size);
         else
-            batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size);
+            batch = new DefaultFileChannelRecordBatch(offset, magic, fileRecords, position, size);
 
         position += batch.sizeInBytes();
         return batch;
@@ -98,7 +98,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
     public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {
         protected final long offset;
         protected final byte magic;
-        protected final FileChannel channel;
+        protected final FileRecords fileRecords;
         protected final int position;
         protected final int batchSize;
 
@@ -107,12 +107,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
         FileChannelRecordBatch(long offset,
                                byte magic,
-                               FileChannel channel,
+                               FileRecords fileRecords,
                                int position,
                                int batchSize) {
             this.offset = offset;
             this.magic = magic;
-            this.channel = channel;
+            this.fileRecords = fileRecords;
             this.position = position;
             this.batchSize = batchSize;
         }
@@ -173,14 +173,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
         @Override
         public void writeTo(ByteBuffer buffer) {
+            FileChannel channel = fileRecords.channel();
             try {
                 int limit = buffer.limit();
                 buffer.limit(buffer.position() + sizeInBytes());
                 Utils.readFully(channel, buffer, position);
                 buffer.limit(limit);
             } catch (IOException e) {
-                throw new KafkaException("Failed to read record batch at position " + position + " from file channel " +
-                        channel, e);
+                throw new KafkaException("Failed to read record batch at position " + position + " from " + fileRecords, e);
             }
         }
 
@@ -207,13 +207,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         }
 
         private RecordBatch loadBatchWithSize(int size, String description) {
+            FileChannel channel = fileRecords.channel();
             try {
                 ByteBuffer buffer = ByteBuffer.allocate(size);
                 Utils.readFullyOrFail(channel, buffer, position, description);
                 buffer.rewind();
                 return toMemoryRecordBatch(buffer);
             } catch (IOException e) {
-                throw new KafkaException(e);
+                throw new KafkaException("Failed to load record batch at position " + position + " from " + fileRecords, e);
             }
         }
 
@@ -226,14 +227,19 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
             FileChannelRecordBatch that = (FileChannelRecordBatch) o;
 
+            FileChannel channel = fileRecords == null ? null : fileRecords.channel();
+            FileChannel thatChannel = that.fileRecords == null ? null : that.fileRecords.channel();
+
             return offset == that.offset &&
                     position == that.position &&
                     batchSize == that.batchSize &&
-                    (channel == null ? that.channel == null : channel.equals(that.channel));
+                    (channel == null ? thatChannel == null : channel.equals(thatChannel));
         }
 
         @Override
         public int hashCode() {
+            FileChannel channel = fileRecords == null ? null : fileRecords.channel();
+
             int result = (int) (offset ^ (offset >>> 32));
             result = 31 * result + (channel != null ? channel.hashCode() : 0);
             result = 31 * result + position;