You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/02/20 10:43:33 UTC

[bookkeeper] branch master updated: BOOKKEEPER-1919: putEntryOffset translate FileInfoDeletedException

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 10272ea  BOOKKEEPER-1919: putEntryOffset translate FileInfoDeletedException
10272ea is described below

commit 10272ead9d9b446aeee3d8acd5df9febfd5705c7
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Wed Feb 20 02:43:29 2019 -0800

    BOOKKEEPER-1919: putEntryOffset translate FileInfoDeletedException
    
    IndexInMemPageMgr should translate FileInfoDeletedException into
    NoLedgerException as expected by users like
    InterleavedLedgerStorage.updateEntriesLocations and
    EntryMemTable.flushSnapshot.
    
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1950 from athanatos/forupstream/wip-1919
---
 .../bookkeeper/bookie/IndexInMemPageMgr.java       |   2 +
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  | 122 +++++++++++++++++++++
 2 files changed, 124 insertions(+)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index 1e7d432..0b77dd7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -571,6 +571,8 @@ class IndexInMemPageMgr {
             lep = getLedgerEntryPage(ledger, pageEntry);
             assert lep != null;
             lep.setOffset(offset, offsetInPage * LedgerEntryPage.getIndexEntrySize());
+        } catch (FileInfo.FileInfoDeletedException e) {
+            throw new Bookie.NoLedgerException(ledger);
         } finally {
             if (null != lep) {
                 lep.releasePage();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index a606f9b..405b93a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -379,6 +379,128 @@ public class LedgerCacheTest {
         }
     }
 
+
+    /**
+     * Test for race between putEntryOffset and flush.
+     * {@link https://github.com/apache/bookkeeper/issues/1919}
+     */
+    @Test
+    public void testPutEntryOffsetDeleteRace() throws Exception {
+        newLedgerCache();
+        final AtomicInteger rc = new AtomicInteger(0);
+        final LinkedBlockingQueue<Long> putQ = new LinkedBlockingQueue<>(100);
+        final LinkedBlockingQueue<Long> deleteQ = new LinkedBlockingQueue<>(100);
+        final byte[] masterKey = "masterKey".getBytes();
+        final long numLedgers = 1000;
+        final int numPutters = 10;
+        final int numDeleters = 10;
+        final AtomicBoolean running = new AtomicBoolean(true);
+        Thread newLedgerThread = new Thread() {
+                public void run() {
+                    try {
+                        for (long i = 0; i < numLedgers && rc.get() == 0; i++) {
+                            ledgerCache.setMasterKey(i, masterKey);
+
+                            ledgerCache.putEntryOffset(i, 1, 0);
+                            deleteQ.put(i);
+                            putQ.put(i);
+                        }
+                        for (int i = 0; i < numPutters; ++i) {
+                            putQ.put(-1L);
+                        }
+                        for (int i = 0; i < numDeleters; ++i) {
+                            deleteQ.put(-1L);
+                        }
+                    } catch (Throwable e) {
+                        rc.set(-1);
+                        LOG.error("Exception in new ledger thread", e);
+                    }
+                }
+            };
+        newLedgerThread.start();
+
+        Thread[] flushThreads = new Thread[numPutters];
+        for (int i = 0; i < numPutters; ++i) {
+            Thread flushThread = new Thread() {
+                public void run() {
+                    try {
+                        while (true) {
+                            long id = putQ.take();
+                            if (id == -1L) {
+                                break;
+                            }
+                            LOG.info("Putting {}", id);
+                            try {
+                                ledgerCache.putEntryOffset(id, 2, 0);
+                                ledgerCache.deleteLedger(id);
+                            } catch (NoLedgerException e) {
+                                // No problem
+                            }
+                        }
+                    } catch (Throwable e) {
+                        rc.set(-1);
+                        LOG.error("Exception in put thread", e);
+                    }
+                }
+            };
+            flushThread.start();
+            flushThreads[i] = flushThread;
+        }
+
+        Thread[] deleteThreads = new Thread[numDeleters];
+        for (int i = 0; i < numDeleters; ++i) {
+            Thread deleteThread = new Thread() {
+                public void run() {
+                    try {
+                        while (true) {
+                            long id = deleteQ.take();
+                            if (id == -1L) {
+                                break;
+                            }
+                            LOG.info("Deleting {}", id);
+                            try {
+                                ledgerCache.deleteLedger(id);
+                            } catch (NoLedgerException e) {
+                                // No problem
+                            }
+                        }
+                    } catch (Throwable e) {
+                        rc.set(-1);
+                        LOG.error("Exception in delete thread", e);
+                    }
+                }
+            };
+            deleteThread.start();
+            deleteThreads[i] = deleteThread;
+        }
+
+        newLedgerThread.join();
+
+        for (Thread deleteThread : deleteThreads) {
+            deleteThread.join();
+        }
+
+        running.set(false);
+        for (Thread flushThread : flushThreads) {
+            flushThread.join();
+        }
+
+        assertEquals("Should have been no errors", rc.get(), 0);
+        for (long i = 0L; i < numLedgers; ++i) {
+            boolean gotError = false;
+            try {
+                LOG.error("Checking {}", i);
+                ledgerCache.getEntryOffset(i, 0);
+            } catch (NoLedgerException e) {
+                gotError = true;
+            }
+            if (!gotError) {
+                LOG.error("Ledger {} is still around", i);
+                fail("Found ledger " + i + ", which should have been removed");
+            }
+        }
+    }
+
     /**
      * Test for race between delete and flush.
      * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604}