You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/08 09:21:53 UTC

[GitHub] sijie closed pull request #1375: BP14 - forceLedger bookie side implementation

sijie closed pull request #1375: BP14 - forceLedger bookie side implementation
URL: https://github.com/apache/bookkeeper/pull/1375
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 8b1665e00..d2ce94bbc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -70,6 +70,7 @@
     String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
     String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
     String BOOKIE_READ_ENTRY = "BOOKIE_READ_ENTRY";
+    String BOOKIE_FORCE_LEDGER = "BOOKIE_FORCE_LEDGER";
     String BOOKIE_READ_LAST_CONFIRMED = "BOOKIE_READ_LAST_CONFIRMED";
     String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
     String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
@@ -80,6 +81,7 @@
 
     String JOURNAL_SCOPE = "journal";
     String JOURNAL_ADD_ENTRY = "JOURNAL_ADD_ENTRY";
+    String JOURNAL_FORCE_LEDGER = "JOURNAL_FORCE_LEDGER";
     String JOURNAL_SYNC = "JOURNAL_SYNC";
     String JOURNAL_MEM_ADD_ENTRY = "JOURNAL_MEM_ADD_ENTRY";
     String JOURNAL_PREALLOCATION = "JOURNAL_PREALLOCATION";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index b391c531c..76af6eb39 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -23,6 +23,7 @@
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
@@ -119,6 +120,7 @@
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
+    static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
@@ -139,6 +141,7 @@
     private final StatsLogger statsLogger;
     private final Counter writeBytes;
     private final Counter readBytes;
+    private final Counter forceLedgerOps;
     // Bookie Operation Latency Stats
     private final OpStatsLogger addEntryStats;
     private final OpStatsLogger recoveryAddEntryStats;
@@ -734,6 +737,7 @@ public void start() {
         // Expose Stats
         writeBytes = statsLogger.getCounter(WRITE_BYTES);
         readBytes = statsLogger.getCounter(READ_BYTES);
+        forceLedgerOps = statsLogger.getCounter(BOOKIE_FORCE_LEDGER);
         addEntryStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY);
         recoveryAddEntryStats = statsLogger.getOpStatsLogger(BOOKIE_RECOVERY_ADD_ENTRY);
         readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
@@ -1201,6 +1205,21 @@ public ByteBuf getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedger
         return lac;
     }
 
+    /**
+     * Force sync given 'ledgerId' entries on the journal to the disk.
+     * It works like a regular addEntry with ackBeforeSync=false.
+     * This is useful for ledgers with DEFERRED_SYNC write flag.
+     */
+    public void forceLedger(long ledgerId, WriteCallback cb,
+                            Object ctx) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Forcing ledger {}", ledgerId);
+        }
+        Journal journal = getJournal(ledgerId);
+        journal.forceLedger(ledgerId, cb, ctx);
+        forceLedgerOps.inc();
+    }
+
     /**
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 94aafad18..71477c027 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -595,6 +595,7 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
 
     // Expose Stats
     private final OpStatsLogger journalAddEntryStats;
+    private final OpStatsLogger journalForceLedgerStats;
     private final OpStatsLogger journalSyncStats;
     private final OpStatsLogger journalCreationStats;
     private final OpStatsLogger journalFlushStats;
@@ -656,6 +657,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
 
         // Expose Stats
         journalAddEntryStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_ADD_ENTRY);
+        journalForceLedgerStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_LEDGER);
         journalSyncStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_SYNC);
         journalCreationStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CREATION_LATENCY);
         journalFlushStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FLUSH_LATENCY);
@@ -869,6 +871,14 @@ void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
                 journalAddEntryStats, journalQueueSize));
     }
 
+    void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
+        journalQueueSize.inc();
+        queue.add(QueueEntry.create(
+                null, false /* ackBeforeSync */,  ledgerId,
+                Bookie.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
+                journalForceLedgerStats, journalQueueSize));
+    }
+
     /**
      * Get the length of journal entries queue.
      *
@@ -1072,22 +1082,23 @@ public void run() {
                 if (qe == null) { // no more queue entry
                     continue;
                 }
+                if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
+                    int entrySize = qe.entry.readableBytes();
+                    journalWriteBytes.add(entrySize);
+                    journalQueueSize.dec();
 
-                int entrySize = qe.entry.readableBytes();
-                journalWriteBytes.add(entrySize);
-                journalQueueSize.dec();
+                    batchSize += (4 + entrySize);
 
-                batchSize += (4 + entrySize);
+                    lenBuff.clear();
+                    lenBuff.writeInt(entrySize);
 
-                lenBuff.clear();
-                lenBuff.writeInt(entrySize);
-
-                // preAlloc based on size
-                logFile.preAllocIfNeeded(4 + entrySize);
+                    // preAlloc based on size
+                    logFile.preAllocIfNeeded(4 + entrySize);
 
-                bc.write(lenBuff);
-                bc.write(qe.entry);
-                qe.entry.release();
+                    bc.write(lenBuff);
+                    bc.write(qe.entry);
+                    qe.entry.release();
+                }
 
                 toFlush.add(qe);
                 numEntriesToFlush++;
@@ -1159,4 +1170,5 @@ private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException
     public void joinThread() throws InterruptedException {
         join();
     }
+
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index 6714d9cbe..3a8ad8646 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -314,4 +314,63 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
         return supportQueue;
     }
 
+    @Test
+    public void testForceLedger() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath());
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+                enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        long ledgerId = 1;
+        journal.forceLedger(ledgerId, new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+
+        // forceLedger should not complete even if ForceWriteThread is suspended
+        // wait that an entry is written to the ForceWriteThread queue
+        while (supportQueue.isEmpty()) {
+            Thread.sleep(100);
+        }
+        assertEquals(1, latch.getCount());
+        assertEquals(1, supportQueue.size());
+
+        // in constructor of JournalChannel we are calling forceWrite(true) but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // let ForceWriteThread work
+        forceWriteThreadSuspendedLatch.countDown();
+
+        // callback should complete now
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        assertEquals(0, supportQueue.size());
+
+        // verify that log marker advanced
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite) > 0);
+
+        journal.shutdown();
+    }
+
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
index b43df80b0..2d197d39f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.complete;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 import static org.mockito.ArgumentMatchers.any;
@@ -31,6 +33,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
@@ -72,8 +75,8 @@ public void testJournalLogAddEntryCalledCorrectly() throws Exception {
         Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setJournalDirName(journalDir.getPath())
-            .setLedgerDirNames(new String[]{ledgerDir.getPath()})
-            .setMetadataServiceUri(null);
+                .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+                .setMetadataServiceUri(null);
         BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
         CountDownLatch journalJoinLatch = new CountDownLatch(1);
         Journal journal = mock(Journal.class);
@@ -108,12 +111,10 @@ public void testJournalLogAddEntryCalledCorrectly() throws Exception {
         byte[] masterKey = new byte[64];
         for (boolean ackBeforeSync : new boolean[]{true, false}) {
             CountDownLatch latch = new CountDownLatch(1);
-            final ByteBuf data = Unpooled.buffer();
-            data.writeLong(ledgerId);
-            data.writeLong(entryId);
+            final ByteBuf data = buildEntry(ledgerId, entryId, -1);
             final long expectedEntryId = entryId;
             b.addEntry(data, ackBeforeSync, (int rc, long ledgerId1, long entryId1,
-                                             BookieSocketAddress addr, Object ctx) -> {
+                    BookieSocketAddress addr, Object ctx) -> {
                 assertSame(expectedCtx, ctx);
                 assertEquals(ledgerId, ledgerId1);
                 assertEquals(expectedEntryId, entryId1);
@@ -127,4 +128,73 @@ public void testJournalLogAddEntryCalledCorrectly() throws Exception {
         journalJoinLatch.countDown();
         b.shutdown();
     }
+
+    /**
+     * test that Bookie calls correctly Journal.forceLedger and is able to return the correct LastAddPersisted entry id.
+     */
+    @Test
+    public void testForceLedger() throws Exception {
+
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+        File ledgerDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+                .setLedgerDirNames(new String[]{ledgerDir.getPath()});
+
+        Bookie b = new Bookie(conf);
+        b.start();
+
+        long ledgerId = 1;
+        long entryId = 0;
+        Object expectedCtx = "foo";
+        byte[] masterKey = new byte[64];
+
+        CompletableFuture<Long> latchForceLedger1 = new CompletableFuture<>();
+        CompletableFuture<Long> latchForceLedger2 = new CompletableFuture<>();
+        CompletableFuture<Long> latchAddEntry = new CompletableFuture<>();
+        final ByteBuf data = buildEntry(ledgerId, entryId, -1);
+        final long expectedEntryId = entryId;
+        b.forceLedger(ledgerId, (int rc, long ledgerId1, long entryId1,
+                BookieSocketAddress addr, Object ctx) -> {
+            if (rc != BKException.Code.OK) {
+                latchForceLedger1.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            complete(latchForceLedger1, null);
+        }, expectedCtx);
+        result(latchForceLedger1);
+
+        b.addEntry(data, true /* ackBeforesync */, (int rc, long ledgerId1, long entryId1,
+                        BookieSocketAddress addr, Object ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        latchAddEntry.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                        return;
+                    }
+                    latchAddEntry.complete(entryId);
+                }, expectedCtx, masterKey);
+        assertEquals(expectedEntryId, result(latchAddEntry).longValue());
+
+        // issue a new "forceLedger"
+        b.forceLedger(ledgerId, (int rc, long ledgerId1, long entryId1,
+                BookieSocketAddress addr, Object ctx) -> {
+            if (rc != BKException.Code.OK) {
+                latchForceLedger2.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            complete(latchForceLedger2, null);
+        }, expectedCtx);
+        result(latchForceLedger2);
+
+        b.shutdown();
+    }
+
+    private static ByteBuf buildEntry(long ledgerId, long entryId, long lastAddConfirmed) {
+        final ByteBuf data = Unpooled.buffer();
+        data.writeLong(ledgerId);
+        data.writeLong(entryId);
+        data.writeLong(lastAddConfirmed);
+        return data;
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services