You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by sj...@apache.org on 2019/01/04 00:41:42 UTC
[bookkeeper] branch master updated: ISSUE #1892: ILS: reset retry
in consistency check loop
This is an automated email from the ASF dual-hosted git repository.
sjust pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ed7ca37 ISSUE #1892: ILS: reset retry in consistency check loop
ed7ca37 is described below
commit ed7ca37e96639b8d7105653e971c1bd1b4533f8a
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Thu Jan 3 16:41:37 2019 -0800
ISSUE #1892: ILS: reset retry in consistency check loop
This patch additionally:
- modifies InterleavedLedgerStorageTest.java to test with and without
entryLogPerLedger
- refactors the test a bit to ensure that the gc calls really do race
with the right part of the checker
- addresses a few other more cosmetic errors
(bug W-5721713)
Signed-off-by: Samuel Just <sjustsalesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>
This closes #1894 from athanatos/forupstream/wip-1892, closes #1892
---
.../bookkeeper/bookie/BookKeeperServerStats.java | 1 +
.../org/apache/bookkeeper/bookie/BookieShell.java | 2 +-
.../org/apache/bookkeeper/bookie/EntryLogger.java | 10 +-
.../bookie/InterleavedLedgerStorage.java | 31 ++++-
.../bookie/InterleavedLedgerStorageTest.java | 132 ++++++++++++++++++---
5 files changed, 152 insertions(+), 24 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 69d0eda..cdafd15 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -111,6 +111,7 @@ public interface BookKeeperServerStats {
// Ledger Storage Scrub Stats
String STORAGE_SCRUB_PAGES_SCANNED = "STORAGE_SCRUB_PAGES_SCANNED";
+ String STORAGE_SCRUB_PAGE_RETRIES = "STORAGE_SCRUB_PAGE_RETRIES";
// Ledger Cache Stats
String LEDGER_CACHE_READ_PAGE = "LEDGER_CACHE_READ_PAGE";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 32e288b..f53195a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -1234,7 +1234,7 @@ public class BookieShell implements Tool {
}
/**
- * Print the metadata for a ledger.
+ * Check local storage for inconsistencies.
*/
class LocalConsistencyCheck extends MyCommand {
Options lOpts = new Options();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 6b41b6c..af82620 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -696,6 +696,10 @@ public class EntryLogger {
}
+ static long posForOffset(long location) {
+ return location & 0xffffffffL;
+ }
+
/**
* Exception type for representing lookup errors. Useful for disambiguating different error
@@ -778,7 +782,7 @@ public class EntryLogger {
throws EntryLookupException, IOException {
ByteBuf sizeBuff = sizeBuffer.get();
sizeBuff.clear();
- pos -= 4; // we want to get the ledgerId and length to check
+ pos -= 4; // we want to get the entrySize as well as the ledgerId and entryId
BufferedReadChannel fc;
try {
fc = getChannelForLogId(entryLogId);
@@ -817,14 +821,14 @@ public class EntryLogger {
void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
long entryLogId = logIdForOffset(location);
- long pos = location & 0xffffffffL;
+ long pos = posForOffset(location);
getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
}
public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
throws IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(location);
- long pos = location & 0xffffffffL;
+ long pos = posForOffset(location);
final EntryLogEntry entry;
try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6078d69..d1287c3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -29,6 +29,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRYLOGGER_SCO
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGES_SCANNED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -55,6 +56,7 @@ import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
@@ -112,6 +114,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
)
private OpStatsLogger getEntryStats;
private OpStatsLogger pageScanStats;
+ private Counter retryCounter;
@VisibleForTesting
public InterleavedLedgerStorage() {
@@ -149,11 +152,34 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
Checkpointer checkpointer,
EntryLogListener entryLogListener,
StatsLogger statsLogger) throws IOException {
+ initializeWithEntryLogger(
+ conf,
+ ledgerManager,
+ ledgerDirsManager,
+ indexDirsManager,
+ stateManager,
+ checkpointSource,
+ checkpointer,
+ new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE)),
+ statsLogger);
+ }
+
+ @VisibleForTesting
+ public void initializeWithEntryLogger(ServerConfiguration conf,
+ LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager,
+ LedgerDirsManager indexDirsManager,
+ StateManager stateManager,
+ CheckpointSource checkpointSource,
+ Checkpointer checkpointer,
+ EntryLogger entryLogger,
+ StatsLogger statsLogger) throws IOException {
checkNotNull(checkpointSource, "invalid null checkpoint source");
checkNotNull(checkpointer, "invalid null checkpointer");
+ this.entryLogger = entryLogger;
+ this.entryLogger.addListener(this);
this.checkpointSource = checkpointSource;
this.checkpointer = checkpointer;
- entryLogger = new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE));
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
@@ -162,6 +188,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
pageScanStats = statsLogger.getOpStatsLogger(STORAGE_SCRUB_PAGES_SCANNED);
+ retryCounter = statsLogger.getCounter(STORAGE_SCRUB_PAGE_RETRIES);
}
private LedgerDirsListener getLedgerDirsListener() {
@@ -538,6 +565,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
@Cleanup LedgerEntryPage lep = page.getLEP();
MutableBoolean retry = new MutableBoolean(false);
do {
+ retry.setValue(false);
int version = lep.getVersion();
MutableBoolean success = new MutableBoolean(true);
@@ -556,6 +584,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
} else {
LOG.debug("localConsistencyCheck: concurrent modification, retrying");
retry.setValue(true);
+ retryCounter.inc();
}
return false;
} else {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index fd4f314..4fde8e7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.bookie;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
@@ -27,9 +29,11 @@ import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -37,18 +41,37 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.lang.mutable.MutableLong;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test for InterleavedLedgerStorage.
*/
+@RunWith(Parameterized.class)
public class InterleavedLedgerStorageTest {
+ private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorageTest.class);
+
+ @Parameterized.Parameters
+ public static Iterable<Boolean> elplSetting() {
+ return Arrays.asList(true, false);
+ }
+
+ public InterleavedLedgerStorageTest(boolean elplSetting) {
+ conf.setEntryLogSizeLimit(2048);
+ conf.setEntryLogPerLedgerEnabled(elplSetting);
+ }
CheckpointSource checkpointSource = new CheckpointSource() {
@Override
@@ -73,8 +96,41 @@ public class InterleavedLedgerStorageTest {
}
};
+ static class TestableEntryLogger extends EntryLogger {
+ public interface CheckEntryListener {
+ void accept(long ledgerId,
+ long entryId,
+ long entryLogId,
+ long pos);
+ }
+ volatile CheckEntryListener testPoint;
+
+ public TestableEntryLogger(
+ ServerConfiguration conf,
+ LedgerDirsManager ledgerDirsManager,
+ EntryLogListener listener,
+ StatsLogger statsLogger) throws IOException {
+ super(conf, ledgerDirsManager, listener, statsLogger);
+ }
+
+ void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedException {
+ this.testPoint = testPoint;
+ }
+
+ @Override
+ void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
+ CheckEntryListener runBefore = testPoint;
+ if (runBefore != null) {
+ runBefore.accept(ledgerId, entryId, logIdForOffset(location), posForOffset(location));
+ }
+ super.checkEntry(ledgerId, entryId, location);
+ }
+ }
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
LedgerDirsManager ledgerDirsManager;
+ TestableEntryLogger entryLogger;
InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
final long numWrites = 2000;
final long entriesPerWrite = 2;
@@ -87,14 +143,16 @@ public class InterleavedLedgerStorageTest {
File curDir = Bookie.getCurrentDirectory(tmpDir);
Bookie.checkDirectoryStructure(curDir);
- conf = TestBKConfiguration.newServerConfiguration();
- conf.setLedgerDirNames(new String[] { tmpDir.toString() });
- conf.setEntryLogSizeLimit(2048);
+ conf.setLedgerDirNames(new String[]{tmpDir.toString()});
ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
- interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
- null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ entryLogger = new TestableEntryLogger(
+ conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
+ interleavedStorage.initializeWithEntryLogger(
+ conf, null, ledgerDirsManager, ledgerDirsManager,
+ null, checkpointSource, checkpointer, entryLogger,
+ statsProvider.getStatsLogger(BOOKIE_SCOPE));
// Insert some ledger & entries in the interleaved storage
for (long entryId = 0; entryId < numWrites; entryId++) {
@@ -132,41 +190,77 @@ public class InterleavedLedgerStorageTest {
}
@Test
- public void testConsistencyCheckConcurrentModification() throws Exception {
- AtomicBoolean done = new AtomicBoolean(false);
- EntryLogger entryLogger = interleavedStorage.getEntryLogger();
- List<Exception> asyncErrors = new ArrayList<>();
+ public void testConsistencyCheckConcurrentGC() throws Exception {
+ final long signalDone = -1;
+ final List<Exception> asyncErrors = new ArrayList<>();
+ final LinkedBlockingQueue<Long> toCompact = new LinkedBlockingQueue<>();
+ final Semaphore awaitingCompaction = new Semaphore(0);
+
+ interleavedStorage.flush();
+ final long lastLogId = entryLogger.getLeastUnflushedLogId();
+
+ final MutableInt counter = new MutableInt(0);
+ entryLogger.setCheckEntryTestPoint((ledgerId, entryId, entryLogId, pos) -> {
+ if (entryLogId < lastLogId) {
+ if (counter.intValue() % 100 == 0) {
+ try {
+ toCompact.put(entryLogId);
+ awaitingCompaction.acquire();
+ } catch (InterruptedException e) {
+ asyncErrors.add(e);
+ }
+ }
+ counter.increment();
+ }
+ });
+
Thread mutator = new Thread(() -> {
EntryLogCompactor compactor = new EntryLogCompactor(
conf,
entryLogger,
interleavedStorage,
entryLogger::removeEntryLog);
- long next = 0;
- while (!done.get()) {
+ while (true) {
+ Long next = null;
try {
+ next = toCompact.take();
+ if (next == null || next == signalDone) {
+ break;
+ }
compactor.compact(entryLogger.getEntryLogMetadata(next));
- next++;
- } catch (IOException e) {
+ } catch (BufferedChannelBase.BufferedChannelClosedException e) {
+ // next was already removed, ignore
+ } catch (Exception e) {
asyncErrors.add(e);
break;
+ } finally {
+ if (next != null) {
+ awaitingCompaction.release();
+ }
}
}
});
mutator.start();
- for (int i = 0; i < 100; ++i) {
- assert interleavedStorage.localConsistencyCheck(Optional.empty()).size() == 0;
- Thread.sleep(10);
+ List<LedgerStorage.DetectedInconsistency> inconsistencies = interleavedStorage.localConsistencyCheck(
+ Optional.empty());
+ for (LedgerStorage.DetectedInconsistency e: inconsistencies) {
+ LOG.error("Found: {}", e);
}
+ Assert.assertEquals(0, inconsistencies.size());
- done.set(true);
+ toCompact.offer(signalDone);
mutator.join();
for (Exception e: asyncErrors) {
throw e;
}
- }
+ if (!conf.isEntryLogPerLedgerEnabled()) {
+ Assert.assertNotEquals(
+ 0,
+ statsProvider.getCounter(BOOKIE_SCOPE + "." + STORAGE_SCRUB_PAGE_RETRIES).get().longValue());
+ }
+ }
@Test
public void testConsistencyMissingEntry() throws Exception {