You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/03/14 18:03:12 UTC

[bookkeeper] branch master updated: ISSUE #1255: Bookie should not advance the journal marker before creating the index file

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e8980f  ISSUE #1255: Bookie should not advance the journal marker before creating the index file
7e8980f is described below

commit 7e8980fff096a9111e9365d0dc959af0625a87db
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Mar 14 11:03:05 2018 -0700

    ISSUE #1255: Bookie should not advance the journal marker before creating the index file
    
    Descriptions of the changes in this PR:
    
    *Problem*
    
    Currently Bookie journal 'new ledger' entry if a ledger doesn't exist at ledger storage. This 'new ledger' entry is journaled before adding the entry to ledger storage. so this would cause a problem on checkpointing.
    
    - journal 'new ledger' entry
    
    ```
        /**
         * Retrieve the ledger descriptor for the ledger which entry should be added to.
         * The LedgerDescriptor returned from this method should be eventually freed with
         * #putHandle().
         *
         * throws BookieException if masterKey does not match the master key of the ledger
         */
        private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
                throws IOException, BookieException {
            final long ledgerId = entry.getLong(entry.readerIndex());
    
            LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
            if (masterKeyCache.get(ledgerId) == null) {
                // Force the load into masterKey cache
                byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
                if (oldValue == null) {
                    // new handle, we should add the key to journal ensure we can rebuild
                    ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
                    bb.putLong(ledgerId);
                    bb.putLong(METAENTRY_ID_LEDGER_KEY);
                    bb.putInt(masterKey.length);
                    bb.put(masterKey);
                    bb.flip();
    
                    getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync */, new NopWriteCallback(), null);
                }
            }
    
            return l;
        }
    ```
    
    - add entry to ledger storage
    ```
        /**
         * Add an entry to a ledger as specified by handle.
         */
        private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
                                      boolean ackBeforeSync, WriteCallback cb, Object ctx)
                throws IOException, BookieException {
            long ledgerId = handle.getLedgerId();
            long entryId = handle.addEntry(entry);
    
            writeBytes.add(entry.readableBytes());
    
            if (LOG.isTraceEnabled()) {
                LOG.trace("Adding {}{}", entryId, ledgerId);
            }
            getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
        }
    
    ```
    
    The problematic sequence can be described as below:
    
    - thread t1 is adding the first entry of ledger L1
    - thread t2 is adding entries of ledger L2
    - t1 is adding a journal entry of 'new ledger L1'
    - t2 is adding entries after t1 adds the journal entry and before t1 adding the entry to ledger storage
    - after t2 added entries, checkpoint happens in the ledger storage. it would roll the journal mark, which will claim the 'new ledger L1' entry as persistent.
    - if the bookie restarts, it would fail with no such ledger exception.
    
    The problem can be produced using a unit test: https://github.com/sijie/bookkeeper/commit/5053a717cd578aeb88236d373553d7494501b801
    
    *Solution*
    
    The fix is simple - just make sure the 'new ledger' journal entry is added after an entry is added to ledger storage. so it make sure when checkpoint happen it will flush and create the ledger before moving the journal mark.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Charan Reddy Guttapalem <re...@gmail.com>, Jia Zhai <None>, Yiming Zang <yz...@gmail.com>, Matteo Merli <mm...@apache.org>
    
    This closes #1256 from sijie/fix_db_ledger_storage_checkpoint, closes #1255
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  46 ++---
 .../org/apache/bookkeeper/bookie/SyncThread.java   |   3 +
 .../bookie/CheckpointOnNewLedgersTest.java         | 197 +++++++++++++++++++++
 3 files changed, 224 insertions(+), 22 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 58e1d1c..e6d090e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1096,28 +1096,12 @@ public class Bookie extends BookieCriticalThread {
      *
      * @throws BookieException if masterKey does not match the master key of the ledger
      */
-    private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
+    @VisibleForTesting
+    LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
             throws IOException, BookieException {
         final long ledgerId = entry.getLong(entry.readerIndex());
 
-        LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
-        if (masterKeyCache.get(ledgerId) == null) {
-            // Force the load into masterKey cache
-            byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
-            if (oldValue == null) {
-                // new handle, we should add the key to journal ensure we can rebuild
-                ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
-                bb.putLong(ledgerId);
-                bb.putLong(METAENTRY_ID_LEDGER_KEY);
-                bb.putInt(masterKey.length);
-                bb.put(masterKey);
-                bb.flip();
-
-                getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync */, new NopWriteCallback(), null);
-            }
-        }
-
-        return l;
+        return handles.getHandle(ledgerId, masterKey);
     }
 
     private Journal getJournal(long ledgerId) {
@@ -1128,13 +1112,31 @@ public class Bookie extends BookieCriticalThread {
      * Add an entry to a ledger as specified by handle.
      */
     private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
-                                  boolean ackBeforeSync, WriteCallback cb, Object ctx)
+                                  boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
         long ledgerId = handle.getLedgerId();
         long entryId = handle.addEntry(entry);
 
         writeBytes.add(entry.readableBytes());
 
+        // journal `addEntry` should happen after the entry is added to ledger storage.
+        // otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage.
+        if (masterKeyCache.get(ledgerId) == null) {
+            // Force the load into masterKey cache
+            byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
+            if (oldValue == null) {
+                // new handle, we should add the key to journal ensure we can rebuild
+                ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
+                bb.putLong(ledgerId);
+                bb.putLong(METAENTRY_ID_LEDGER_KEY);
+                bb.putInt(masterKey.length);
+                bb.put(masterKey);
+                bb.flip();
+
+                getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync */, new NopWriteCallback(), null);
+            }
+        }
+
         if (LOG.isTraceEnabled()) {
             LOG.trace("Adding {}@{}", entryId, ledgerId);
         }
@@ -1156,7 +1158,7 @@ public class Bookie extends BookieCriticalThread {
             LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
             synchronized (handle) {
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx);
+                addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1216,7 +1218,7 @@ public class Bookie extends BookieCriticalThread {
                             .create(BookieException.Code.LedgerFencedException);
                 }
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, ackBeforeSync, cb, ctx);
+                addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index eeba1c4..466d46a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
@@ -56,6 +58,7 @@ import org.apache.bookkeeper.util.MathUtils;
 @Slf4j
 class SyncThread implements Checkpointer {
 
+    @Getter(AccessLevel.PACKAGE)
     final ScheduledExecutorService executor;
     final LedgerStorage ledgerStorage;
     final LedgerDirsListener dirsListener;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CheckpointOnNewLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CheckpointOnNewLedgersTest.java
new file mode 100644
index 0000000..e0a8289
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CheckpointOnNewLedgersTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Test the checkpoint logic of {@link DbLedgerStorage}.
+ */
+@Slf4j
+public class CheckpointOnNewLedgersTest {
+
+    @Rule
+    public final TemporaryFolder testDir = new TemporaryFolder();
+
+    private ServerConfiguration conf;
+    private Bookie bookie;
+    private CountDownLatch getLedgerDescCalledLatch;
+    private CountDownLatch getLedgerDescWaitLatch;
+
+    @Before
+    public void setup() throws Exception {
+        File bkDir = testDir.newFolder("dbLedgerStorageCheckpointTest");
+        File curDir = Bookie.getCurrentDirectory(bkDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+        conf.setJournalDirsName(new String[] { bkDir.toString() });
+        conf.setLedgerDirNames(new String[] { bkDir.toString() });
+        conf.setEntryLogSizeLimit(10 * 1024);
+
+        bookie = spy(new Bookie(conf));
+        bookie.start();
+
+        getLedgerDescCalledLatch = new CountDownLatch(1);
+        getLedgerDescWaitLatch = new CountDownLatch(1);
+
+        // spy `getLedgerForEntry`
+        doAnswer(invocationOnMock -> {
+            ByteBuf entry = invocationOnMock.getArgument(0);
+            long ledgerId = entry.getLong(entry.readerIndex());
+
+            LedgerDescriptor ld = (LedgerDescriptor) invocationOnMock.callRealMethod();
+
+            if (ledgerId % 2 == 1) {
+                getLedgerDescCalledLatch.countDown();
+                getLedgerDescWaitLatch.await();
+            }
+
+            return ld;
+        }).when(bookie).getLedgerForEntry(
+            any(ByteBuf.class),
+            any(byte[].class));
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != bookie) {
+            bookie.shutdown();
+        }
+    }
+
+    private static ByteBuf createByteBuf(long ledgerId, long entryId, int entrySize) {
+        byte[] data = new byte[entrySize];
+        ThreadLocalRandom.current().nextBytes(data);
+        ByteBuf buffer = Unpooled.wrappedBuffer(data);
+        buffer.writerIndex(0);
+        buffer.writeLong(ledgerId);
+        buffer.writeLong(entryId);
+        buffer.writeLong(entryId - 1); // lac
+        buffer.writerIndex(entrySize);
+        return buffer;
+    }
+
+    @Test
+    public void testCheckpoint() throws Exception {
+        int entrySize = 1024;
+        long l1 = 1L;
+        long l2 = 2L;
+
+        final CountDownLatch writeL1Latch = new CountDownLatch(1);
+
+        Thread t1 = new Thread(() -> {
+
+            ByteBuf entry = createByteBuf(l1, 0L, entrySize);
+            try {
+                bookie.addEntry(
+                    entry,
+                    false,
+                    (rc, ledgerId, entryId, addr, ctx) -> writeL1Latch.countDown(),
+                    null,
+                    new byte[0]
+                );
+            } catch (Exception e) {
+                log.info("Failed to write entry to l1", e);
+            }
+
+        }, "ledger-1-writer");
+
+        t1.start();
+
+        // wait until the ledger desc is opened
+        getLedgerDescCalledLatch.await();
+
+        LastLogMark logMark = bookie.journals.get(0).getLastLogMark().markLog();
+
+        // keep write entries to l2 to trigger entry log rolling to checkpoint
+        int numEntries = 10;
+        final CountDownLatch writeL2Latch = new CountDownLatch(numEntries);
+        for (int i = 0; i < numEntries; i++) {
+            ByteBuf entry = createByteBuf(l2, i, entrySize);
+            bookie.addEntry(
+                entry,
+                false,
+                (rc, ledgerId, entryId, addr, ctx) -> writeL2Latch.countDown(),
+                null,
+                new byte[0]);
+        }
+        writeL2Latch.await();
+
+        // wait until checkpoint to complete and journal marker is rolled.
+        bookie.syncThread.getExecutor().submit(() -> {}).get();
+
+        log.info("Wait until checkpoint is completed");
+
+        // the journal mark is rolled.
+        LastLogMark newLogMark = bookie.journals.get(0).getLastLogMark().markLog();
+        assertTrue(newLogMark.getCurMark().compare(logMark.getCurMark()) > 0);
+
+        // resume l1-writer to continue writing the entries
+        getLedgerDescWaitLatch.countDown();
+
+        // wait until the l1 entry is written
+        writeL1Latch.await();
+        t1.join();
+
+        // construct a new bookie to simulate "bookie restart from crash"
+        Bookie newBookie = new Bookie(conf);
+        newBookie.start();
+
+        for (int i = 0; i < numEntries; i++) {
+            ByteBuf entry = newBookie.readEntry(l2, i);
+            assertNotNull(entry);
+            assertEquals(l2, entry.readLong());
+            assertEquals((long) i, entry.readLong());
+            entry.release();
+        }
+
+        ByteBuf entry = newBookie.readEntry(l1, 0L);
+        assertNotNull(entry);
+        assertEquals(l1, entry.readLong());
+        assertEquals(0L, entry.readLong());
+        entry.release();
+        newBookie.shutdown();
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.