You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/31 08:20:34 UTC
[bookkeeper] branch branch-4.7 updated: ISSUE #1757: prevent race
between flush and delete from recreating index
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.7 by this push:
new 1c54206 ISSUE #1757: prevent race between flush and delete from recreating index
1c54206 is described below
commit 1c54206c985a4d361e8bffa4cf9de4265a1bfc45
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Wed Oct 31 01:20:30 2018 -0700
ISSUE #1757: prevent race between flush and delete from recreating index
IndexPersistencManager.flushLedgerHandle can race with delete by
obtaining a FileInfo just prior to delete and then proceeding to rewrite
the file info resurrecting it. FileInfo provides a convenient point of
synchronization for avoinding this race. FileInfo.moveLedgerIndexFile,
FileInfo.flushHeader, and FileInfo.delete() are synchronized already, so
this patch simply adds a deleted flag to the FileInfo object to indicate
that the FileInfo has become invalid. checkOpen is called in every
method and will now throw FileInfoDeleted if delete has been called.
IndexPersistenceManager can catch it and deal with it appropriately in
flush (which generally means moving onto the next ledger).
This patch also eliminates ledgersToFlush and ledgersFlushing. Their
purpose appears to be to allow delete to avoid flushing a ledger which
has been selected for flushing but not flushed yet avoiding the above
race. It's not sufficient, however, because IndexInMemPageMgr calls
IndexPersistenceManager.flushLedgerHeader, which can obtain a FileInfo
for the ledger prior to the deletion and then call
relocateIndexFileAndFlushHeader afterwards.
Also, if the purpose was to avoid concurrent calls into
flushSpecificLedger on the same ledger, it's wrong because of the
following sequence:
t0: thread 0 calls flushOneOrMoreLedgers
t1: thread 0 place ledger 10 into ledgersFlushing and completes
flushSpecificLedger
t2: thread 2 performs a write to ledger 10
t3: thread 1 calls flushOneOrMoreLedgers skipping ledger 10
t4: thread 0 releases ledger 10 from ledgersFlushing
t5: thread 1 completes flushOneOrMoreLedgers
Although thread 1 begins to flush after the write to ledger 10, it won't
capture the write rendering the flush incorrect. I don't think it's
actually worth avoiding overlapping flushes here because both FileInfo
and LedgerEntryPage track dirty state. As such, it seems simpler to
just get rid of them.
This patch also adds a more agressive version of testFlushDeleteRace to
test the new behavior. Testing with multiple flushers turned up a bug
with LedgerEntryPage.getPageToWrite where didn't return a buffer with
independent read pointers, so this patch addresses that as well.
(bug W-5549455)
(rev cguttapalem)
Signed-off-by: Samuel Just <sjustsalesforce.com>
(cherry picked from commit 7b5ac3d5e76ac4df618764cafe80aef2994703ec)
Author:
Reviewers: Enrico Olivelli <eolivelligmail.com>, Sijie Guo <sijieapache.org>
This closes #1769 from athanatos/forupstream/wip-1757, closes #1757
(cherry picked from commit 41e4bccb9694e1f373e919f6891b8e88b2232c5e)
Conflicts:
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
Minor conflict over fileInfoVersionToWrite from the explicit lac patch.
Reviewers: Sijie Guo <si...@apache.org>
This closes #1774 from athanatos/forupstream/wip-1757-4.7, closes #1757
---
.../org/apache/bookkeeper/bookie/FileInfo.java | 17 +++++
.../bookkeeper/bookie/FileInfoBackingCache.java | 2 +
.../bookkeeper/bookie/IndexInMemPageMgr.java | 22 ++----
.../bookkeeper/bookie/IndexPersistenceMgr.java | 16 ++++-
.../apache/bookkeeper/bookie/LedgerEntryPage.java | 3 +-
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 83 +++++++++++++++-------
6 files changed, 95 insertions(+), 48 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index eb6d9e1..b6794a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -91,12 +91,15 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
// file access mode
protected String mode;
+ private boolean deleted;
+
public FileInfo(File lf, byte[] masterKey) throws IOException {
super(WATCHER_RECYCLER);
this.lf = lf;
this.masterKey = masterKey;
mode = "rw";
+ this.deleted = false;
}
synchronized Long getLastAddConfirmed() {
@@ -224,6 +227,16 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
}
}
+ public synchronized boolean isDeleted() {
+ return deleted;
+ }
+
+ public static class FileInfoDeletedException extends IOException {
+ FileInfoDeletedException() {
+ super("FileInfo already deleted");
+ }
+ }
+
@VisibleForTesting
void checkOpen(boolean create) throws IOException {
checkOpen(create, false);
@@ -231,6 +244,9 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
private synchronized void checkOpen(boolean create, boolean openBeforeClose)
throws IOException {
+ if (deleted) {
+ throw new FileInfoDeletedException();
+ }
if (fc != null) {
return;
}
@@ -497,6 +513,7 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
}
public synchronized boolean delete() {
+ deleted = true;
return lf.delete();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
index 31fd077..fa81db3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -47,6 +47,8 @@ class FileInfoBackingCache {
boolean retained = fi.tryRetain();
if (!retained) {
throw new IOException("FileInfo " + fi + " is already marked dead");
+ } else if (fi.isDeleted()) {
+ throw new Bookie.NoLedgerException(fi.ledgerId);
}
return fi;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index 66e97f7..0cf5cc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -40,7 +40,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -345,12 +344,6 @@ class IndexInMemPageMgr {
// flush and read pages
private final IndexPersistenceMgr indexPersistenceManager;
- /**
- * the list of potentially dirty ledgers.
- */
- private final ConcurrentLinkedQueue<Long> ledgersToFlush = new ConcurrentLinkedQueue<Long>();
- private final ConcurrentSkipListSet<Long> ledgersFlushing = new ConcurrentSkipListSet<Long>();
-
// Stats
private final Counter ledgerCacheHitCounter;
private final Counter ledgerCacheMissCounter;
@@ -504,7 +497,6 @@ class IndexInMemPageMgr {
void removePagesForLedger(long ledgerId) {
pageMapAndList.removeEntriesForALedger(ledgerId);
- ledgersToFlush.remove(ledgerId);
}
long getLastEntryInMem(long ledgerId) {
@@ -542,18 +534,12 @@ class IndexInMemPageMgr {
}
void flushOneOrMoreLedgers(boolean doAll) throws IOException {
- if (ledgersToFlush.isEmpty()) {
- ledgersToFlush.addAll(pageMapAndList.getActiveLedgers());
- }
- Long potentiallyDirtyLedger;
- while (null != (potentiallyDirtyLedger = ledgersToFlush.poll())) {
- if (!ledgersFlushing.add(potentiallyDirtyLedger)) {
- continue;
- }
+ List<Long> ledgersToFlush = new ArrayList<>(pageMapAndList.getActiveLedgers());
+ for (Long potentiallyDirtyLedger : ledgersToFlush) {
try {
flushSpecificLedger(potentiallyDirtyLedger);
- } finally {
- ledgersFlushing.remove(potentiallyDirtyLedger);
+ } catch (Bookie.NoLedgerException e) {
+ continue;
}
if (!doAll) {
break;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index eb3b935..0f21709 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -511,7 +511,12 @@ public class IndexPersistenceMgr {
private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi));
- fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+ try {
+ fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+ } catch (FileInfo.FileInfoDeletedException fileInfoDeleted) {
+ // File concurrently deleted
+ throw new Bookie.NoLedgerException(l);
+ }
}
void flushLedgerHeader(long ledger) throws IOException {
@@ -585,7 +590,7 @@ public class IndexPersistenceMgr {
private void writeBuffers(Long ledger,
List<LedgerEntryPage> entries, FileInfo fi,
- int start, int count) throws IOException {
+ int start, int count) throws IOException, Bookie.NoLedgerException {
if (LOG.isTraceEnabled()) {
LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger));
}
@@ -602,7 +607,12 @@ public class IndexPersistenceMgr {
}
long totalWritten = 0;
while (buffs[buffs.length - 1].remaining() > 0) {
- long rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition());
+ long rc = 0;
+ try {
+ rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition());
+ } catch (FileInfo.FileInfoDeletedException e) {
+ throw new Bookie.NoLedgerException(ledger);
+ }
if (rc <= 0) {
throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index a9cef72..be87559 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -207,7 +207,8 @@ public class LedgerEntryPage {
public ByteBuffer getPageToWrite() {
checkPage();
page.clear();
- return page;
+ // Different callers to this method should be able to reasonably expect independent read pointers
+ return page.duplicate();
}
long getLedger() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 1c51209..9ae8633 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -375,24 +375,33 @@ public class LedgerCacheTest {
}
/**
- * Race where a flush would fail because a garbage collection occurred at
- * the wrong time.
+ * Test for race between delete and flush.
* {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604}
+ * {@link https://github.com/apache/bookkeeper/issues/1757}
*/
@Test
public void testFlushDeleteRace() throws Exception {
newLedgerCache();
final AtomicInteger rc = new AtomicInteger(0);
- final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<Long>(1);
+ final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<>(100);
final byte[] masterKey = "masterKey".getBytes();
+ final long numLedgers = 1000;
+ final int numFlushers = 10;
+ final int numDeleters = 10;
+ final AtomicBoolean running = new AtomicBoolean(true);
Thread newLedgerThread = new Thread() {
public void run() {
try {
- for (int i = 0; i < 1000 && rc.get() == 0; i++) {
+ for (long i = 0; i < numLedgers && rc.get() == 0; i++) {
ledgerCache.setMasterKey(i, masterKey);
- ledgerQ.put((long) i);
+
+ ledgerCache.putEntryOffset(i, 1, 0);
+ ledgerQ.put(i);
+ }
+ for (int i = 0; i < numDeleters; ++i) {
+ ledgerQ.put(-1L);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
rc.set(-1);
LOG.error("Exception in new ledger thread", e);
}
@@ -400,51 +409,73 @@ public class LedgerCacheTest {
};
newLedgerThread.start();
- Thread flushThread = new Thread() {
+ Thread[] flushThreads = new Thread[numFlushers];
+ for (int i = 0; i < numFlushers; ++i) {
+ Thread flushThread = new Thread() {
public void run() {
try {
- while (true) {
- Long id = ledgerQ.peek();
- if (id == null) {
- continue;
- }
- LOG.info("Put entry for {}", id);
- try {
- ledgerCache.putEntryOffset((long) id, 1, 0);
- } catch (Bookie.NoLedgerException nle) {
- //ignore
- }
+ while (running.get()) {
ledgerCache.flushLedger(true);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
rc.set(-1);
LOG.error("Exception in flush thread", e);
}
+ LOG.error("Shutting down flush thread");
}
};
- flushThread.start();
+ flushThread.start();
+ flushThreads[i] = flushThread;
+ }
- Thread deleteThread = new Thread() {
+ Thread[] deleteThreads = new Thread[numDeleters];
+ for (int i = 0; i < numDeleters; ++i) {
+ Thread deleteThread = new Thread() {
public void run() {
try {
while (true) {
long id = ledgerQ.take();
+ if (id == -1L) {
+ break;
+ }
LOG.info("Deleting {}", id);
ledgerCache.deleteLedger(id);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
rc.set(-1);
LOG.error("Exception in delete thread", e);
}
}
};
- deleteThread.start();
+ deleteThread.start();
+ deleteThreads[i] = deleteThread;
+ }
newLedgerThread.join();
- assertEquals("Should have been no errors", rc.get(), 0);
- deleteThread.interrupt();
- flushThread.interrupt();
+ for (Thread deleteThread : deleteThreads) {
+ deleteThread.join();
+ }
+
+ running.set(false);
+ for (Thread flushThread : flushThreads) {
+ flushThread.join();
+ }
+
+ assertEquals("Should have been no errors", rc.get(), 0);
+ for (long i = 0L; i < numLedgers; ++i) {
+ boolean gotError = false;
+ try {
+ LOG.error("Checking {}", i);
+ ledgerCache.getEntryOffset(i, 0);
+ } catch (NoLedgerException e) {
+ gotError = true;
+ }
+ if (!gotError) {
+ LOG.error("Ledger {} is still around", i);
+ fail("Found ledger " + i + ", which should have been removed");
+ }
+ }
}
// Mock SortedLedgerStorage to simulate flush failure (Dependency Fault Injection)