You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/14 20:35:15 UTC

[GitHub] merlimat closed pull request #844: DbLedgerStorage -- Make some entrylogger methods accessibles

merlimat closed pull request #844: DbLedgerStorage -- Make some entrylogger methods accessibles
URL: https://github.com/apache/bookkeeper/pull/844
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 79cd2f930..6ae499e98 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1437,6 +1437,11 @@ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
         return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
     }
 
+    @VisibleForTesting
+    public LedgerStorage getLedgerStorage() {
+        return ledgerStorage;
+    }
+
     // The rest of the code is test stuff
     static class CounterCallback implements WriteCallback {
         int count;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index d2291513d..cdb11a600 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -26,6 +26,7 @@
 import static org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
 import com.google.common.collect.MapMaker;
+import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -40,6 +41,7 @@
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
@@ -52,6 +54,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -203,7 +206,7 @@ public ConcurrentLongLongHashMap getLedgersMap() {
     /**
      * Scan entries in a entry log file.
      */
-    interface EntryLogScanner {
+    public interface EntryLogScanner {
         /**
          * Tests whether or not the entries belongs to the specified ledger
          * should be processed.
@@ -837,7 +840,7 @@ void flushRotatedLogs() throws IOException {
         leastUnflushedLogId = flushedLogId + 1;
     }
 
-    void flush() throws IOException {
+    public void flush() throws IOException {
         flushRotatedLogs();
         flushCurrentLog();
     }
@@ -867,7 +870,7 @@ protected ByteBuf initialValue() throws Exception {
         }
     };
 
-    synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+    public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
         int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
         boolean reachEntryLogLimit =
             rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize);
@@ -963,7 +966,6 @@ void removeCurCompactionLog() {
         }
     }
 
-
     private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
         if (!doRegularFlushes) {
             return;
@@ -986,7 +988,8 @@ synchronized boolean readEntryLogHardLimit(long size) {
         return logChannel.position() + size > Integer.MAX_VALUE;
     }
 
-    ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
+    public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
+            throws IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
         long pos = location & 0xffffffffL;
         ByteBuf sizeBuff = sizeBuffer.get();
@@ -1035,6 +1038,15 @@ ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException
                                               + pos + "(" + rc + "!=" + entrySize + ")", ledgerId, entryId);
         }
         data.writerIndex(entrySize);
+
+        return data;
+    }
+
+    public ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
+        long entryLogId = logIdForOffset(location);
+        long pos = location & 0xffffffffL;
+
+        ByteBuf data = internalReadEntry(ledgerId, entryId, location);
         long thisLedgerId = data.getLong(0);
         if (thisLedgerId != ledgerId) {
             data.release();
@@ -1112,6 +1124,35 @@ boolean logExists(long logId) {
         return false;
     }
 
+    /**
+     * Returns a set with the ids of all the entry log files.
+     *
+     * @throws IOException
+     */
+    public Set<Long> getEntryLogsSet() throws IOException {
+        Set<Long> entryLogs = Sets.newTreeSet();
+
+        final FilenameFilter logFileFilter = new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return name.endsWith(".log");
+            }
+        };
+
+        for (File d : ledgerDirsManager.getAllLedgerDirs()) {
+            File[] files = d.listFiles(logFileFilter);
+            if (files == null) {
+                throw new IOException("Failed to get list of files in directory " + d);
+            }
+
+            for (File f : files) {
+                Long entryLogId = Long.parseLong(f.getName().split(".log")[0], 16);
+                entryLogs.add(entryLogId);
+            }
+        }
+        return entryLogs;
+    }
+
     private File findFile(long logId) throws FileNotFoundException {
         for (File d : ledgerDirsManager.getAllLedgerDirs()) {
             File f = new File(d, Long.toHexString(logId) + ".log");
@@ -1129,7 +1170,7 @@ private File findFile(long logId) throws FileNotFoundException {
      * @param scanner Entry Log Scanner
      * @throws IOException
      */
-    protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
+    public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
         // Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes)
         ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
         BufferedReadChannel bc;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 2b357eee4..b5c951372 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -25,6 +25,7 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
@@ -83,7 +84,8 @@
     private OpStatsLogger getOffsetStats;
     private OpStatsLogger getEntryStats;
 
-    InterleavedLedgerStorage() {
+    @VisibleForTesting
+    public InterleavedLedgerStorage() {
         activeLedgers = new SnapshotMap<Long, Boolean>();
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 527762b35..f402a9492 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -27,14 +27,18 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Sets;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
@@ -46,7 +50,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Tests for EntryLog.
  */
@@ -351,4 +354,34 @@ public void testPreAllocateLog() throws Exception {
 
     }
 
+    /**
+     * Test the getEntryLogsSet() method.
+     */
+    @Test
+    public void testGetEntryLogsSet() throws Exception {
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        // create some entries
+        EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
+
+        assertEquals(Sets.newHashSet(0L, 1L), logger.getEntryLogsSet());
+
+        logger.rollLog();
+        logger.flushRotatedLogs();
+
+        assertEquals(Sets.newHashSet(0L, 1L, 2L), logger.getEntryLogsSet());
+
+        logger.rollLog();
+        logger.flushRotatedLogs();
+
+        assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet());
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services