You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/10/02 08:05:47 UTC

[bookkeeper] branch master updated: Avoid throwing exception when doing EntryLogger.internalReadEntry

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e63d3c  Avoid throwing exception when doing EntryLogger.internalReadEntry
1e63d3c is described below

commit 1e63d3c83e89d152cb603955e303b5377d8ab8e6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 2 01:05:42 2019 -0700

    Avoid throwing exception when doing EntryLogger.internalReadEntry
    
    ### Motivation
    
    In the refactoring part of #1819, the `internalReadEntry()` behavior was changed into throwing an exception when reading an entry from a different ledger.
    
    This is causing a big performance issue when doing read-head from the ledger storage, because we keep reading from the current entry log until we find an entry from a different ledger.
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
    
    This closes #2172 from merlimat/read-internal
---
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 80 ++++++++++++----------
 .../ldb/SingleDirectoryDbLedgerStorage.java        |  3 +-
 2 files changed, 45 insertions(+), 38 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 6662d59..731275c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -756,37 +756,43 @@ public class EntryLogger {
         }
     }
 
-    private static class EntryLogEntry {
-        final int entrySize;
-        final BufferedReadChannel fc;
-
-        EntryLogEntry(int entrySize, BufferedReadChannel fc) {
-            this.entrySize = entrySize;
-            this.fc = fc;
-        }
-    }
-
-    private EntryLogEntry getFCForEntryInternal(
+    private BufferedReadChannel getFCForEntryInternal(
             long ledgerId, long entryId, long entryLogId, long pos)
             throws EntryLookupException, IOException {
-        ByteBuf sizeBuff = sizeBuffer.get();
-        sizeBuff.clear();
-        pos -= 4; // we want to get the entrySize as well as the ledgerId and entryId
-        BufferedReadChannel fc;
         try {
-            fc = getChannelForLogId(entryLogId);
+            return getChannelForLogId(entryLogId);
         } catch (FileNotFoundException e) {
             throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, pos);
         }
+    }
+
+    private ByteBuf readEntrySize(long ledgerId, long entryId, long entryLogId, long pos, BufferedReadChannel fc)
+            throws EntryLookupException, IOException {
+        ByteBuf sizeBuff = sizeBuffer.get();
+        sizeBuff.clear();
+
+        long entrySizePos = pos - 4; // we want to get the entrySize as well as the ledgerId and entryId
 
         try {
-            if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != sizeBuff.capacity()) {
-                throw new EntryLookupException.MissingEntryException(ledgerId, entryId, entryLogId, pos);
+            if (readFromLogChannel(entryLogId, fc, sizeBuff, entrySizePos) != sizeBuff.capacity()) {
+                throw new EntryLookupException.MissingEntryException(ledgerId, entryId, entryLogId, entrySizePos);
             }
         } catch (BufferedChannelBase.BufferedChannelClosedException | AsynchronousCloseException e) {
-            throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, pos);
+            throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, entrySizePos);
         }
-        pos += 4;
+        return sizeBuff;
+    }
+
+    void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
+        long entryLogId = logIdForOffset(location);
+        long pos = posForOffset(location);
+        BufferedReadChannel fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+        ByteBuf sizeBuf = readEntrySize(ledgerId, entryId, entryLogId, pos, fc);
+        validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuf);
+    }
+
+    private void validateEntry(long ledgerId, long entryId, long entryLogId, long pos, ByteBuf sizeBuff)
+            throws IOException, EntryLookupException {
         int entrySize = sizeBuff.readInt();
 
         // entrySize does not include the ledgerId
@@ -805,23 +811,24 @@ public class EntryLogger {
             throw new EntryLookupException.WrongEntryException(
                     thisEntryId, thisLedgerId, ledgerId, entryId, entryLogId, pos);
         }
-        return new EntryLogEntry(entrySize, fc);
-    }
-
-    void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
-        long entryLogId = logIdForOffset(location);
-        long pos = posForOffset(location);
-        getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
     }
 
-    public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
+    public ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
             throws IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
         long pos = posForOffset(location);
 
-        final EntryLogEntry entry;
+
+        BufferedReadChannel fc = null;
+        int entrySize = -1;
         try {
-            entry = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+            fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+
+            ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId, pos, fc);
+            entrySize = sizeBuff.getInt(0);
+            if (validateEntry) {
+                validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff);
+            }
         } catch (EntryLookupException.MissingEntryException entryLookupError) {
             throw new Bookie.NoEntryException("Short read from entrylog " + entryLogId,
                     ledgerId, entryId);
@@ -829,9 +836,9 @@ public class EntryLogger {
             throw new IOException(e.toString());
         }
 
-        ByteBuf data = allocator.buffer(entry.entrySize, entry.entrySize);
-        int rc = readFromLogChannel(entryLogId, entry.fc, data, pos);
-        if (rc != entry.entrySize) {
+        ByteBuf data = allocator.buffer(entrySize, entrySize);
+        int rc = readFromLogChannel(entryLogId, fc, data, pos);
+        if (rc != entrySize) {
             // Note that throwing NoEntryException here instead of IOException is not
             // without risk. If all bookies in a quorum throw this same exception
             // the client will assume that it has reached the end of the ledger.
@@ -842,16 +849,15 @@ public class EntryLogger {
             data.release();
             throw new Bookie.NoEntryException("Short read for " + ledgerId + "@"
                                               + entryId + " in " + entryLogId + "@"
-                                              + pos + "(" + rc + "!=" + entry.entrySize + ")", ledgerId, entryId);
+                                              + pos + "(" + rc + "!=" + entrySize + ")", ledgerId, entryId);
         }
-        data.writerIndex(entry.entrySize);
+        data.writerIndex(entrySize);
 
         return data;
     }
 
     public ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
-        ByteBuf data = internalReadEntry(ledgerId, entryId, location);
-        return data;
+        return internalReadEntry(ledgerId, entryId, location, true /* validateEntry */);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 58ce2be..e975f16 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -466,7 +466,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
             long size = 0;
 
             while (count < readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) {
-                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation);
+                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation,
+                        false /* validateEntry */);
 
                 try {
                     long currentEntryLedgerId = entry.getLong(0);