You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/31 08:20:34 UTC

[bookkeeper] branch branch-4.7 updated: ISSUE #1757: prevent race between flush and delete from recreating index

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 1c54206  ISSUE #1757: prevent race between flush and delete from recreating index
1c54206 is described below

commit 1c54206c985a4d361e8bffa4cf9de4265a1bfc45
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Wed Oct 31 01:20:30 2018 -0700

    ISSUE #1757: prevent race between flush and delete from recreating index
    
    IndexPersistencManager.flushLedgerHandle can race with delete by
    obtaining a FileInfo just prior to delete and then proceeding to rewrite
    the file info resurrecting it.  FileInfo provides a convenient point of
    synchronization for avoinding this race.  FileInfo.moveLedgerIndexFile,
    FileInfo.flushHeader, and FileInfo.delete() are synchronized already, so
    this patch simply adds a deleted flag to the FileInfo object to indicate
    that the FileInfo has become invalid.  checkOpen is called in every
    method and will now throw FileInfoDeleted if delete has been called.
    IndexPersistenceManager can catch it and deal with it appropriately in
    flush (which generally means moving onto the next ledger).
    
    This patch also eliminates ledgersToFlush and ledgersFlushing.  Their
    purpose appears to be to allow delete to avoid flushing a ledger which
    has been selected for flushing but not flushed yet avoiding the above
    race.  It's not sufficient, however, because IndexInMemPageMgr calls
    IndexPersistenceManager.flushLedgerHeader, which can obtain a FileInfo
    for the ledger prior to the deletion and then call
    relocateIndexFileAndFlushHeader afterwards.
    
    Also, if the purpose was to avoid concurrent calls into
    flushSpecificLedger on the same ledger, it's wrong because of the
    following sequence:
    
    t0: thread 0 calls flushOneOrMoreLedgers
    t1: thread 0 place ledger 10 into ledgersFlushing and completes
    flushSpecificLedger
    t2: thread 2 performs a write to ledger 10
    t3: thread 1 calls flushOneOrMoreLedgers skipping ledger 10
    t4: thread 0 releases ledger 10 from ledgersFlushing
    t5: thread 1 completes flushOneOrMoreLedgers
    
    Although thread 1 begins to flush after the write to ledger 10, it won't
    capture the write rendering the flush incorrect.  I don't think it's
    actually worth avoiding overlapping flushes here because both FileInfo
    and LedgerEntryPage track dirty state.  As such, it seems simpler to
    just get rid of them.
    
    This patch also adds a more agressive version of testFlushDeleteRace to
    test the new behavior.  Testing with multiple flushers turned up a bug
    with LedgerEntryPage.getPageToWrite where didn't return a buffer with
    independent read pointers, so this patch addresses that as well.
    
    (bug W-5549455)
    (rev cguttapalem)
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    (cherry picked from commit 7b5ac3d5e76ac4df618764cafe80aef2994703ec)
    
    Author:
    
    Reviewers: Enrico Olivelli <eolivelligmail.com>, Sijie Guo <sijieapache.org>
    
    This closes #1769 from athanatos/forupstream/wip-1757, closes #1757
    
    (cherry picked from commit 41e4bccb9694e1f373e919f6891b8e88b2232c5e)
    
    Conflicts:
        bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
    
    Minor conflict over fileInfoVersionToWrite from the explicit lac patch.
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1774 from athanatos/forupstream/wip-1757-4.7, closes #1757
---
 .../org/apache/bookkeeper/bookie/FileInfo.java     | 17 +++++
 .../bookkeeper/bookie/FileInfoBackingCache.java    |  2 +
 .../bookkeeper/bookie/IndexInMemPageMgr.java       | 22 ++----
 .../bookkeeper/bookie/IndexPersistenceMgr.java     | 16 ++++-
 .../apache/bookkeeper/bookie/LedgerEntryPage.java  |  3 +-
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  | 83 +++++++++++++++-------
 6 files changed, 95 insertions(+), 48 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index eb6d9e1..b6794a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -91,12 +91,15 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
     // file access mode
     protected String mode;
 
+    private boolean deleted;
+
     public FileInfo(File lf, byte[] masterKey) throws IOException {
         super(WATCHER_RECYCLER);
 
         this.lf = lf;
         this.masterKey = masterKey;
         mode = "rw";
+        this.deleted = false;
     }
 
     synchronized Long getLastAddConfirmed() {
@@ -224,6 +227,16 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
         }
     }
 
+    public synchronized boolean isDeleted() {
+        return deleted;
+    }
+
+    public static class FileInfoDeletedException extends IOException {
+        FileInfoDeletedException() {
+            super("FileInfo already deleted");
+        }
+    }
+
     @VisibleForTesting
     void checkOpen(boolean create) throws IOException {
         checkOpen(create, false);
@@ -231,6 +244,9 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
 
     private synchronized void checkOpen(boolean create, boolean openBeforeClose)
             throws IOException {
+        if (deleted) {
+            throw new FileInfoDeletedException();
+        }
         if (fc != null) {
             return;
         }
@@ -497,6 +513,7 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
     }
 
     public synchronized boolean delete() {
+        deleted = true;
         return lf.delete();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
index 31fd077..fa81db3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -47,6 +47,8 @@ class FileInfoBackingCache {
         boolean retained = fi.tryRetain();
         if (!retained) {
             throw new IOException("FileInfo " + fi + " is already marked dead");
+        } else if (fi.isDeleted()) {
+            throw new Bookie.NoLedgerException(fi.ledgerId);
         }
         return fi;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index 66e97f7..0cf5cc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -40,7 +40,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -345,12 +344,6 @@ class IndexInMemPageMgr {
     // flush and read pages
     private final IndexPersistenceMgr indexPersistenceManager;
 
-    /**
-     * the list of potentially dirty ledgers.
-     */
-    private final ConcurrentLinkedQueue<Long> ledgersToFlush = new ConcurrentLinkedQueue<Long>();
-    private final ConcurrentSkipListSet<Long> ledgersFlushing = new ConcurrentSkipListSet<Long>();
-
     // Stats
     private final Counter ledgerCacheHitCounter;
     private final Counter ledgerCacheMissCounter;
@@ -504,7 +497,6 @@ class IndexInMemPageMgr {
 
     void removePagesForLedger(long ledgerId) {
         pageMapAndList.removeEntriesForALedger(ledgerId);
-        ledgersToFlush.remove(ledgerId);
     }
 
     long getLastEntryInMem(long ledgerId) {
@@ -542,18 +534,12 @@ class IndexInMemPageMgr {
     }
 
     void flushOneOrMoreLedgers(boolean doAll) throws IOException {
-        if (ledgersToFlush.isEmpty()) {
-            ledgersToFlush.addAll(pageMapAndList.getActiveLedgers());
-        }
-        Long potentiallyDirtyLedger;
-        while (null != (potentiallyDirtyLedger = ledgersToFlush.poll())) {
-            if (!ledgersFlushing.add(potentiallyDirtyLedger)) {
-                continue;
-            }
+        List<Long> ledgersToFlush = new ArrayList<>(pageMapAndList.getActiveLedgers());
+        for (Long potentiallyDirtyLedger : ledgersToFlush) {
             try {
                 flushSpecificLedger(potentiallyDirtyLedger);
-            } finally {
-                ledgersFlushing.remove(potentiallyDirtyLedger);
+            } catch (Bookie.NoLedgerException e) {
+                continue;
             }
             if (!doAll) {
                 break;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index eb3b935..0f21709 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -511,7 +511,12 @@ public class IndexPersistenceMgr {
 
     private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
         File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi));
-        fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+        try {
+            fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+        } catch (FileInfo.FileInfoDeletedException fileInfoDeleted) {
+            // File concurrently deleted
+            throw new Bookie.NoLedgerException(l);
+        }
     }
 
     void flushLedgerHeader(long ledger) throws IOException {
@@ -585,7 +590,7 @@ public class IndexPersistenceMgr {
 
     private void writeBuffers(Long ledger,
                               List<LedgerEntryPage> entries, FileInfo fi,
-                              int start, int count) throws IOException {
+                              int start, int count) throws IOException, Bookie.NoLedgerException {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger));
         }
@@ -602,7 +607,12 @@ public class IndexPersistenceMgr {
         }
         long totalWritten = 0;
         while (buffs[buffs.length - 1].remaining() > 0) {
-            long rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition());
+            long rc = 0;
+            try {
+                rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition());
+            } catch (FileInfo.FileInfoDeletedException e) {
+                throw new Bookie.NoLedgerException(ledger);
+            }
             if (rc <= 0) {
                 throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index a9cef72..be87559 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -207,7 +207,8 @@ public class LedgerEntryPage {
     public ByteBuffer getPageToWrite() {
         checkPage();
         page.clear();
-        return page;
+        // Different callers to this method should be able to reasonably expect independent read pointers
+        return page.duplicate();
     }
 
     long getLedger() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 1c51209..9ae8633 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -375,24 +375,33 @@ public class LedgerCacheTest {
     }
 
     /**
-     * Race where a flush would fail because a garbage collection occurred at
-     * the wrong time.
+     * Test for race between delete and flush.
      * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604}
+     * {@link https://github.com/apache/bookkeeper/issues/1757}
      */
     @Test
     public void testFlushDeleteRace() throws Exception {
         newLedgerCache();
         final AtomicInteger rc = new AtomicInteger(0);
-        final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<Long>(1);
+        final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<>(100);
         final byte[] masterKey = "masterKey".getBytes();
+        final long numLedgers = 1000;
+        final int numFlushers = 10;
+        final int numDeleters = 10;
+        final AtomicBoolean running = new AtomicBoolean(true);
         Thread newLedgerThread = new Thread() {
                 public void run() {
                     try {
-                        for (int i = 0; i < 1000 && rc.get() == 0; i++) {
+                        for (long i = 0; i < numLedgers && rc.get() == 0; i++) {
                             ledgerCache.setMasterKey(i, masterKey);
-                            ledgerQ.put((long) i);
+
+                            ledgerCache.putEntryOffset(i, 1, 0);
+                            ledgerQ.put(i);
+                        }
+                        for (int i = 0; i < numDeleters; ++i) {
+                            ledgerQ.put(-1L);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in new ledger thread", e);
                     }
@@ -400,51 +409,73 @@ public class LedgerCacheTest {
             };
         newLedgerThread.start();
 
-        Thread flushThread = new Thread() {
+        Thread[] flushThreads = new Thread[numFlushers];
+        for (int i = 0; i < numFlushers; ++i) {
+            Thread flushThread = new Thread() {
                 public void run() {
                     try {
-                        while (true) {
-                            Long id = ledgerQ.peek();
-                            if (id == null) {
-                                continue;
-                            }
-                            LOG.info("Put entry for {}", id);
-                            try {
-                                ledgerCache.putEntryOffset((long) id, 1, 0);
-                            } catch (Bookie.NoLedgerException nle) {
-                                //ignore
-                            }
+                        while (running.get()) {
                             ledgerCache.flushLedger(true);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in flush thread", e);
                     }
+                    LOG.error("Shutting down flush thread");
                 }
             };
-        flushThread.start();
+            flushThread.start();
+            flushThreads[i] = flushThread;
+        }
 
-        Thread deleteThread = new Thread() {
+        Thread[] deleteThreads = new Thread[numDeleters];
+        for (int i = 0; i < numDeleters; ++i) {
+            Thread deleteThread = new Thread() {
                 public void run() {
                     try {
                         while (true) {
                             long id = ledgerQ.take();
+                            if (id == -1L) {
+                                break;
+                            }
                             LOG.info("Deleting {}", id);
                             ledgerCache.deleteLedger(id);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in delete thread", e);
                     }
                 }
             };
-        deleteThread.start();
+            deleteThread.start();
+            deleteThreads[i] = deleteThread;
+        }
 
         newLedgerThread.join();
-        assertEquals("Should have been no errors", rc.get(), 0);
 
-        deleteThread.interrupt();
-        flushThread.interrupt();
+        for (Thread deleteThread : deleteThreads) {
+            deleteThread.join();
+        }
+
+        running.set(false);
+        for (Thread flushThread : flushThreads) {
+            flushThread.join();
+        }
+
+        assertEquals("Should have been no errors", rc.get(), 0);
+        for (long i = 0L; i < numLedgers; ++i) {
+            boolean gotError = false;
+            try {
+                LOG.error("Checking {}", i);
+                ledgerCache.getEntryOffset(i, 0);
+            } catch (NoLedgerException e) {
+                gotError = true;
+            }
+            if (!gotError) {
+                LOG.error("Ledger {} is still around", i);
+                fail("Found ledger " + i + ", which should have been removed");
+            }
+        }
     }
 
     // Mock SortedLedgerStorage to simulate flush failure (Dependency Fault Injection)