You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/10/02 08:05:47 UTC
[bookkeeper] branch master updated: Avoid throwing exception when
doing EntryLogger.internalReadEntry
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1e63d3c Avoid throwing exception when doing EntryLogger.internalReadEntry
1e63d3c is described below
commit 1e63d3c83e89d152cb603955e303b5377d8ab8e6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 2 01:05:42 2019 -0700
Avoid throwing exception when doing EntryLogger.internalReadEntry
### Motivation
In the refactoring part of #1819, the `internalReadEntry()` behavior was changed into throwing an exception when reading an entry from a different ledger.
This is causing a big performance issue when doing read-head from the ledger storage, because we keep reading from the current entry log until we find an entry from a different ledger.
Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
This closes #2172 from merlimat/read-internal
---
.../org/apache/bookkeeper/bookie/EntryLogger.java | 80 ++++++++++++----------
.../ldb/SingleDirectoryDbLedgerStorage.java | 3 +-
2 files changed, 45 insertions(+), 38 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 6662d59..731275c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -756,37 +756,43 @@ public class EntryLogger {
}
}
- private static class EntryLogEntry {
- final int entrySize;
- final BufferedReadChannel fc;
-
- EntryLogEntry(int entrySize, BufferedReadChannel fc) {
- this.entrySize = entrySize;
- this.fc = fc;
- }
- }
-
- private EntryLogEntry getFCForEntryInternal(
+ private BufferedReadChannel getFCForEntryInternal(
long ledgerId, long entryId, long entryLogId, long pos)
throws EntryLookupException, IOException {
- ByteBuf sizeBuff = sizeBuffer.get();
- sizeBuff.clear();
- pos -= 4; // we want to get the entrySize as well as the ledgerId and entryId
- BufferedReadChannel fc;
try {
- fc = getChannelForLogId(entryLogId);
+ return getChannelForLogId(entryLogId);
} catch (FileNotFoundException e) {
throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, pos);
}
+ }
+
+ private ByteBuf readEntrySize(long ledgerId, long entryId, long entryLogId, long pos, BufferedReadChannel fc)
+ throws EntryLookupException, IOException {
+ ByteBuf sizeBuff = sizeBuffer.get();
+ sizeBuff.clear();
+
+ long entrySizePos = pos - 4; // we want to get the entrySize as well as the ledgerId and entryId
try {
- if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != sizeBuff.capacity()) {
- throw new EntryLookupException.MissingEntryException(ledgerId, entryId, entryLogId, pos);
+ if (readFromLogChannel(entryLogId, fc, sizeBuff, entrySizePos) != sizeBuff.capacity()) {
+ throw new EntryLookupException.MissingEntryException(ledgerId, entryId, entryLogId, entrySizePos);
}
} catch (BufferedChannelBase.BufferedChannelClosedException | AsynchronousCloseException e) {
- throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, pos);
+ throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, entrySizePos);
}
- pos += 4;
+ return sizeBuff;
+ }
+
+ void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
+ long entryLogId = logIdForOffset(location);
+ long pos = posForOffset(location);
+ BufferedReadChannel fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+ ByteBuf sizeBuf = readEntrySize(ledgerId, entryId, entryLogId, pos, fc);
+ validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuf);
+ }
+
+ private void validateEntry(long ledgerId, long entryId, long entryLogId, long pos, ByteBuf sizeBuff)
+ throws IOException, EntryLookupException {
int entrySize = sizeBuff.readInt();
// entrySize does not include the ledgerId
@@ -805,23 +811,24 @@ public class EntryLogger {
throw new EntryLookupException.WrongEntryException(
thisEntryId, thisLedgerId, ledgerId, entryId, entryLogId, pos);
}
- return new EntryLogEntry(entrySize, fc);
- }
-
- void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
- long entryLogId = logIdForOffset(location);
- long pos = posForOffset(location);
- getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
}
- public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
+ public ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
throws IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(location);
long pos = posForOffset(location);
- final EntryLogEntry entry;
+
+ BufferedReadChannel fc = null;
+ int entrySize = -1;
try {
- entry = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+ fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+
+ ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId, pos, fc);
+ entrySize = sizeBuff.getInt(0);
+ if (validateEntry) {
+ validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff);
+ }
} catch (EntryLookupException.MissingEntryException entryLookupError) {
throw new Bookie.NoEntryException("Short read from entrylog " + entryLogId,
ledgerId, entryId);
@@ -829,9 +836,9 @@ public class EntryLogger {
throw new IOException(e.toString());
}
- ByteBuf data = allocator.buffer(entry.entrySize, entry.entrySize);
- int rc = readFromLogChannel(entryLogId, entry.fc, data, pos);
- if (rc != entry.entrySize) {
+ ByteBuf data = allocator.buffer(entrySize, entrySize);
+ int rc = readFromLogChannel(entryLogId, fc, data, pos);
+ if (rc != entrySize) {
// Note that throwing NoEntryException here instead of IOException is not
// without risk. If all bookies in a quorum throw this same exception
// the client will assume that it has reached the end of the ledger.
@@ -842,16 +849,15 @@ public class EntryLogger {
data.release();
throw new Bookie.NoEntryException("Short read for " + ledgerId + "@"
+ entryId + " in " + entryLogId + "@"
- + pos + "(" + rc + "!=" + entry.entrySize + ")", ledgerId, entryId);
+ + pos + "(" + rc + "!=" + entrySize + ")", ledgerId, entryId);
}
- data.writerIndex(entry.entrySize);
+ data.writerIndex(entrySize);
return data;
}
public ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
- ByteBuf data = internalReadEntry(ledgerId, entryId, location);
- return data;
+ return internalReadEntry(ledgerId, entryId, location, true /* validateEntry */);
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 58ce2be..e975f16 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -466,7 +466,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
long size = 0;
while (count < readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) {
- ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation);
+ ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation,
+ false /* validateEntry */);
try {
long currentEntryLedgerId = entry.getLong(0);