You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/28 21:58:16 UTC
[kafka] branch trunk updated: MINOR: Improve exception messages in
FileChannelRecordBatch (#6068)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9295444 MINOR: Improve exception messages in FileChannelRecordBatch (#6068)
9295444 is described below
commit 9295444d48eb057900ef09f1176e34b37331f60b
Author: Flavien Raynaud <fl...@gmail.com>
AuthorDate: Fri Dec 28 21:58:05 2018 +0000
MINOR: Improve exception messages in FileChannelRecordBatch (#6068)
Replace `channel` by `fileRecords` in potentially thrown KafkaException
descriptions when loading/writing `FileChannelRecordBatch`. This makes exception
messages more readable (channel only shows an object hashcode, fileRecords shows
the path of the file being read and start/end positions in the file).
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../common/record/AbstractLegacyRecordBatch.java | 5 ++---
.../kafka/common/record/DefaultRecordBatch.java | 5 ++---
.../kafka/common/record/FileLogInputStream.java | 24 ++++++++++++++--------
3 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index cdf731c..1d50a15 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -29,7 +29,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -530,10 +529,10 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
LegacyFileChannelRecordBatch(long offset,
byte magic,
- FileChannel channel,
+ FileRecords fileRecords,
int position,
int batchSize) {
- super(offset, magic, channel, position, batchSize);
+ super(offset, magic, fileRecords, position, batchSize);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 71e668e..5156c64 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -28,7 +28,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -586,10 +585,10 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
DefaultFileChannelRecordBatch(long offset,
byte magic,
- FileChannel channel,
+ FileRecords fileRecords,
int position,
int batchSize) {
- super(offset, magic, channel, position, batchSize);
+ super(offset, magic, fileRecords, position, batchSize);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 92e8864..472c7a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -82,9 +82,9 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
final FileChannelRecordBatch batch;
if (magic < RecordBatch.MAGIC_VALUE_V2)
- batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size);
+ batch = new LegacyFileChannelRecordBatch(offset, magic, fileRecords, position, size);
else
- batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size);
+ batch = new DefaultFileChannelRecordBatch(offset, magic, fileRecords, position, size);
position += batch.sizeInBytes();
return batch;
@@ -98,7 +98,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {
protected final long offset;
protected final byte magic;
- protected final FileChannel channel;
+ protected final FileRecords fileRecords;
protected final int position;
protected final int batchSize;
@@ -107,12 +107,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
FileChannelRecordBatch(long offset,
byte magic,
- FileChannel channel,
+ FileRecords fileRecords,
int position,
int batchSize) {
this.offset = offset;
this.magic = magic;
- this.channel = channel;
+ this.fileRecords = fileRecords;
this.position = position;
this.batchSize = batchSize;
}
@@ -173,14 +173,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
@Override
public void writeTo(ByteBuffer buffer) {
+ FileChannel channel = fileRecords.channel();
try {
int limit = buffer.limit();
buffer.limit(buffer.position() + sizeInBytes());
Utils.readFully(channel, buffer, position);
buffer.limit(limit);
} catch (IOException e) {
- throw new KafkaException("Failed to read record batch at position " + position + " from file channel " +
- channel, e);
+ throw new KafkaException("Failed to read record batch at position " + position + " from " + fileRecords, e);
}
}
@@ -207,13 +207,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
}
private RecordBatch loadBatchWithSize(int size, String description) {
+ FileChannel channel = fileRecords.channel();
try {
ByteBuffer buffer = ByteBuffer.allocate(size);
Utils.readFullyOrFail(channel, buffer, position, description);
buffer.rewind();
return toMemoryRecordBatch(buffer);
} catch (IOException e) {
- throw new KafkaException(e);
+ throw new KafkaException("Failed to load record batch at position " + position + " from " + fileRecords, e);
}
}
@@ -226,14 +227,19 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
FileChannelRecordBatch that = (FileChannelRecordBatch) o;
+ FileChannel channel = fileRecords == null ? null : fileRecords.channel();
+ FileChannel thatChannel = that.fileRecords == null ? null : that.fileRecords.channel();
+
return offset == that.offset &&
position == that.position &&
batchSize == that.batchSize &&
- (channel == null ? that.channel == null : channel.equals(that.channel));
+ (channel == null ? thatChannel == null : channel.equals(thatChannel));
}
@Override
public int hashCode() {
+ FileChannel channel = fileRecords == null ? null : fileRecords.channel();
+
int result = (int) (offset ^ (offset >>> 32));
result = 31 * result + (channel != null ? channel.hashCode() : 0);
result = 31 * result + position;