You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2019/01/04 00:41:39 UTC

[GitHub] athanatos closed pull request #1894: ISSUE #1892: ILS: reset retry in consistency check loop

athanatos closed pull request #1894: ISSUE #1892: ILS: reset retry in consistency check loop
URL: https://github.com/apache/bookkeeper/pull/1894
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 69d0eda866..cdafd15a99 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -111,6 +111,7 @@
 
     // Ledger Storage Scrub Stats
     String STORAGE_SCRUB_PAGES_SCANNED = "STORAGE_SCRUB_PAGES_SCANNED";
+    String STORAGE_SCRUB_PAGE_RETRIES = "STORAGE_SCRUB_PAGE_RETRIES";
 
     // Ledger Cache Stats
     String LEDGER_CACHE_READ_PAGE = "LEDGER_CACHE_READ_PAGE";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 32e288b7e3..f53195a3fa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -1234,7 +1234,7 @@ Options getOptions() {
     }
 
     /**
-     * Print the metadata for a ledger.
+     * Check local storage for inconsistencies.
      */
     class LocalConsistencyCheck extends MyCommand {
         Options lOpts = new Options();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 6b41b6c958..af82620bf7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -696,6 +696,10 @@ static long logIdForOffset(long offset) {
     }
 
 
+    static long posForOffset(long location) {
+        return location & 0xffffffffL;
+    }
+
 
     /**
      * Exception type for representing lookup errors.  Useful for disambiguating different error
@@ -778,7 +782,7 @@ private EntryLogEntry getFCForEntryInternal(
             throws EntryLookupException, IOException {
         ByteBuf sizeBuff = sizeBuffer.get();
         sizeBuff.clear();
-        pos -= 4; // we want to get the ledgerId and length to check
+        pos -= 4; // we want to get the entrySize as well as the ledgerId and entryId
         BufferedReadChannel fc;
         try {
             fc = getChannelForLogId(entryLogId);
@@ -817,14 +821,14 @@ private EntryLogEntry getFCForEntryInternal(
 
     void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
         long entryLogId = logIdForOffset(location);
-        long pos = location & 0xffffffffL;
+        long pos = posForOffset(location);
         getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
     }
 
     public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
             throws IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
-        long pos = location & 0xffffffffL;
+        long pos = posForOffset(location);
 
         final EntryLogEntry entry;
         try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6078d6924a..d1287c3192 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -29,6 +29,7 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGES_SCANNED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -55,6 +56,7 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
@@ -112,6 +114,7 @@
     )
     private OpStatsLogger getEntryStats;
     private OpStatsLogger pageScanStats;
+    private Counter retryCounter;
 
     @VisibleForTesting
     public InterleavedLedgerStorage() {
@@ -149,11 +152,34 @@ void initializeWithEntryLogListener(ServerConfiguration conf,
                                         Checkpointer checkpointer,
                                         EntryLogListener entryLogListener,
                                         StatsLogger statsLogger) throws IOException {
+        initializeWithEntryLogger(
+                conf,
+                ledgerManager,
+                ledgerDirsManager,
+                indexDirsManager,
+                stateManager,
+                checkpointSource,
+                checkpointer,
+                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE)),
+                statsLogger);
+    }
+
+    @VisibleForTesting
+    public void initializeWithEntryLogger(ServerConfiguration conf,
+                LedgerManager ledgerManager,
+                LedgerDirsManager ledgerDirsManager,
+                LedgerDirsManager indexDirsManager,
+                StateManager stateManager,
+                CheckpointSource checkpointSource,
+                Checkpointer checkpointer,
+                EntryLogger entryLogger,
+                StatsLogger statsLogger) throws IOException {
         checkNotNull(checkpointSource, "invalid null checkpoint source");
         checkNotNull(checkpointer, "invalid null checkpointer");
+        this.entryLogger = entryLogger;
+        this.entryLogger.addListener(this);
         this.checkpointSource = checkpointSource;
         this.checkpointer = checkpointer;
-        entryLogger = new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE));
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
@@ -162,6 +188,7 @@ void initializeWithEntryLogListener(ServerConfiguration conf,
         getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
         getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
         pageScanStats = statsLogger.getOpStatsLogger(STORAGE_SCRUB_PAGES_SCANNED);
+        retryCounter = statsLogger.getCounter(STORAGE_SCRUB_PAGE_RETRIES);
     }
 
     private LedgerDirsListener getLedgerDirsListener() {
@@ -538,6 +565,7 @@ public void onRotateEntryLog() {
                     @Cleanup LedgerEntryPage lep = page.getLEP();
                     MutableBoolean retry = new MutableBoolean(false);
                     do {
+                        retry.setValue(false);
                         int version = lep.getVersion();
 
                         MutableBoolean success = new MutableBoolean(true);
@@ -556,6 +584,7 @@ public void onRotateEntryLog() {
                                     } else {
                                         LOG.debug("localConsistencyCheck: concurrent modification, retrying");
                                         retry.setValue(true);
+                                        retryCounter.inc();
                                     }
                                     return false;
                                 } else {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index fd4f314c4e..4fde8e74a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
 import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
@@ -27,9 +29,11 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -37,18 +41,37 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test for InterleavedLedgerStorage.
  */
+@RunWith(Parameterized.class)
 public class InterleavedLedgerStorageTest {
+    private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorageTest.class);
+
+    @Parameterized.Parameters
+    public static Iterable<Boolean> elplSetting() {
+        return Arrays.asList(true, false);
+    }
+
+    public InterleavedLedgerStorageTest(boolean elplSetting) {
+        conf.setEntryLogSizeLimit(2048);
+        conf.setEntryLogPerLedgerEnabled(elplSetting);
+    }
 
     CheckpointSource checkpointSource = new CheckpointSource() {
         @Override
@@ -73,8 +96,41 @@ public void start() {
         }
     };
 
+    static class TestableEntryLogger extends EntryLogger {
+        public interface CheckEntryListener {
+            void accept(long ledgerId,
+                        long entryId,
+                        long entryLogId,
+                        long pos);
+        }
+        volatile CheckEntryListener testPoint;
+
+        public TestableEntryLogger(
+                ServerConfiguration conf,
+                LedgerDirsManager ledgerDirsManager,
+                EntryLogListener listener,
+                StatsLogger statsLogger) throws IOException {
+            super(conf, ledgerDirsManager, listener, statsLogger);
+        }
+
+        void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedException {
+            this.testPoint = testPoint;
+        }
+
+        @Override
+        void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
+            CheckEntryListener runBefore = testPoint;
+            if (runBefore != null) {
+                runBefore.accept(ledgerId, entryId, logIdForOffset(location), posForOffset(location));
+            }
+            super.checkEntry(ledgerId, entryId, location);
+        }
+    }
+
+    TestStatsProvider statsProvider = new TestStatsProvider();
     ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
     LedgerDirsManager ledgerDirsManager;
+    TestableEntryLogger entryLogger;
     InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
     final long numWrites = 2000;
     final long entriesPerWrite = 2;
@@ -87,14 +143,16 @@ public void setUp() throws Exception {
         File curDir = Bookie.getCurrentDirectory(tmpDir);
         Bookie.checkDirectoryStructure(curDir);
 
-        conf = TestBKConfiguration.newServerConfiguration();
-        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
-        conf.setEntryLogSizeLimit(2048);
+        conf.setLedgerDirNames(new String[]{tmpDir.toString()});
         ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+        entryLogger = new TestableEntryLogger(
+                conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
+        interleavedStorage.initializeWithEntryLogger(
+                conf, null, ledgerDirsManager, ledgerDirsManager,
+                null, checkpointSource, checkpointer, entryLogger,
+                statsProvider.getStatsLogger(BOOKIE_SCOPE));
 
         // Insert some ledger & entries in the interleaved storage
         for (long entryId = 0; entryId < numWrites; entryId++) {
@@ -132,41 +190,77 @@ public void testIndexEntryIterator() throws Exception {
     }
 
     @Test
-    public void testConsistencyCheckConcurrentModification() throws Exception {
-        AtomicBoolean done = new AtomicBoolean(false);
-        EntryLogger entryLogger = interleavedStorage.getEntryLogger();
-        List<Exception> asyncErrors = new ArrayList<>();
+    public void testConsistencyCheckConcurrentGC() throws Exception {
+        final long signalDone = -1;
+        final List<Exception> asyncErrors = new ArrayList<>();
+        final LinkedBlockingQueue<Long> toCompact = new LinkedBlockingQueue<>();
+        final Semaphore awaitingCompaction = new Semaphore(0);
+
+        interleavedStorage.flush();
+        final long lastLogId = entryLogger.getLeastUnflushedLogId();
+
+        final MutableInt counter = new MutableInt(0);
+        entryLogger.setCheckEntryTestPoint((ledgerId, entryId, entryLogId, pos) -> {
+            if (entryLogId < lastLogId) {
+                if (counter.intValue() % 100 == 0) {
+                    try {
+                        toCompact.put(entryLogId);
+                        awaitingCompaction.acquire();
+                    } catch (InterruptedException e) {
+                        asyncErrors.add(e);
+                    }
+                }
+                counter.increment();
+            }
+        });
+
         Thread mutator = new Thread(() -> {
             EntryLogCompactor compactor = new EntryLogCompactor(
                     conf,
                     entryLogger,
                     interleavedStorage,
                     entryLogger::removeEntryLog);
-            long next = 0;
-            while (!done.get()) {
+            while (true) {
+                Long next = null;
                 try {
+                    next = toCompact.take();
+                    if (next == null || next == signalDone) {
+                        break;
+                    }
                     compactor.compact(entryLogger.getEntryLogMetadata(next));
-                    next++;
-                } catch (IOException e) {
+                } catch (BufferedChannelBase.BufferedChannelClosedException e) {
+                    // next was already removed, ignore
+                } catch (Exception e) {
                     asyncErrors.add(e);
                     break;
+                } finally {
+                    if (next != null) {
+                        awaitingCompaction.release();
+                    }
                 }
             }
         });
         mutator.start();
 
-        for (int i = 0; i < 100; ++i) {
-            assert interleavedStorage.localConsistencyCheck(Optional.empty()).size() == 0;
-            Thread.sleep(10);
+        List<LedgerStorage.DetectedInconsistency> inconsistencies = interleavedStorage.localConsistencyCheck(
+                Optional.empty());
+        for (LedgerStorage.DetectedInconsistency e: inconsistencies) {
+            LOG.error("Found: {}", e);
         }
+        Assert.assertEquals(0, inconsistencies.size());
 
-        done.set(true);
+        toCompact.offer(signalDone);
         mutator.join();
         for (Exception e: asyncErrors) {
             throw e;
         }
-    }
 
+        if (!conf.isEntryLogPerLedgerEnabled()) {
+            Assert.assertNotEquals(
+                    0,
+                    statsProvider.getCounter(BOOKIE_SCOPE + "." + STORAGE_SCRUB_PAGE_RETRIES).get().longValue());
+        }
+    }
 
     @Test
     public void testConsistencyMissingEntry() throws Exception {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services