You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by sj...@apache.org on 2019/01/04 00:41:42 UTC

[bookkeeper] branch master updated: ISSUE #1892: ILS: reset retry in consistency check loop

This is an automated email from the ASF dual-hosted git repository.

sjust pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ed7ca37  ISSUE #1892: ILS: reset retry in consistency check loop
ed7ca37 is described below

commit ed7ca37e96639b8d7105653e971c1bd1b4533f8a
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Thu Jan 3 16:41:37 2019 -0800

    ISSUE #1892: ILS: reset retry in consistency check loop
    
    This patch additionally:
    - modifies InterleavedLedgerStorageTest.java to test with and without
      entryLogPerLedger
    - refactors the test a bit to ensure that the gc calls really do race
      with the right part of the checker
    - addresses a few other more cosmetic errors
    
    (bug W-5721713)
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1894 from athanatos/forupstream/wip-1892, closes #1892
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   1 +
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   2 +-
 .../org/apache/bookkeeper/bookie/EntryLogger.java  |  10 +-
 .../bookie/InterleavedLedgerStorage.java           |  31 ++++-
 .../bookie/InterleavedLedgerStorageTest.java       | 132 ++++++++++++++++++---
 5 files changed, 152 insertions(+), 24 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 69d0eda..cdafd15 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -111,6 +111,7 @@ public interface BookKeeperServerStats {
 
     // Ledger Storage Scrub Stats
     String STORAGE_SCRUB_PAGES_SCANNED = "STORAGE_SCRUB_PAGES_SCANNED";
+    String STORAGE_SCRUB_PAGE_RETRIES = "STORAGE_SCRUB_PAGE_RETRIES";
 
     // Ledger Cache Stats
     String LEDGER_CACHE_READ_PAGE = "LEDGER_CACHE_READ_PAGE";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 32e288b..f53195a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -1234,7 +1234,7 @@ public class BookieShell implements Tool {
     }
 
     /**
-     * Print the metadata for a ledger.
+     * Check local storage for inconsistencies.
      */
     class LocalConsistencyCheck extends MyCommand {
         Options lOpts = new Options();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 6b41b6c..af82620 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -696,6 +696,10 @@ public class EntryLogger {
     }
 
 
+    static long posForOffset(long location) {
+        return location & 0xffffffffL;
+    }
+
 
     /**
      * Exception type for representing lookup errors.  Useful for disambiguating different error
@@ -778,7 +782,7 @@ public class EntryLogger {
             throws EntryLookupException, IOException {
         ByteBuf sizeBuff = sizeBuffer.get();
         sizeBuff.clear();
-        pos -= 4; // we want to get the ledgerId and length to check
+        pos -= 4; // we want to get the entrySize as well as the ledgerId and entryId
         BufferedReadChannel fc;
         try {
             fc = getChannelForLogId(entryLogId);
@@ -817,14 +821,14 @@ public class EntryLogger {
 
     void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
         long entryLogId = logIdForOffset(location);
-        long pos = location & 0xffffffffL;
+        long pos = posForOffset(location);
         getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
     }
 
     public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
             throws IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
-        long pos = location & 0xffffffffL;
+        long pos = posForOffset(location);
 
         final EntryLogEntry entry;
         try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6078d69..d1287c3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -29,6 +29,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRYLOGGER_SCO
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGES_SCANNED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -55,6 +56,7 @@ import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
@@ -112,6 +114,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     )
     private OpStatsLogger getEntryStats;
     private OpStatsLogger pageScanStats;
+    private Counter retryCounter;
 
     @VisibleForTesting
     public InterleavedLedgerStorage() {
@@ -149,11 +152,34 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                                         Checkpointer checkpointer,
                                         EntryLogListener entryLogListener,
                                         StatsLogger statsLogger) throws IOException {
+        initializeWithEntryLogger(
+                conf,
+                ledgerManager,
+                ledgerDirsManager,
+                indexDirsManager,
+                stateManager,
+                checkpointSource,
+                checkpointer,
+                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE)),
+                statsLogger);
+    }
+
+    @VisibleForTesting
+    public void initializeWithEntryLogger(ServerConfiguration conf,
+                LedgerManager ledgerManager,
+                LedgerDirsManager ledgerDirsManager,
+                LedgerDirsManager indexDirsManager,
+                StateManager stateManager,
+                CheckpointSource checkpointSource,
+                Checkpointer checkpointer,
+                EntryLogger entryLogger,
+                StatsLogger statsLogger) throws IOException {
         checkNotNull(checkpointSource, "invalid null checkpoint source");
         checkNotNull(checkpointer, "invalid null checkpointer");
+        this.entryLogger = entryLogger;
+        this.entryLogger.addListener(this);
         this.checkpointSource = checkpointSource;
         this.checkpointer = checkpointer;
-        entryLogger = new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE));
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
@@ -162,6 +188,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
         getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
         pageScanStats = statsLogger.getOpStatsLogger(STORAGE_SCRUB_PAGES_SCANNED);
+        retryCounter = statsLogger.getCounter(STORAGE_SCRUB_PAGE_RETRIES);
     }
 
     private LedgerDirsListener getLedgerDirsListener() {
@@ -538,6 +565,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                     @Cleanup LedgerEntryPage lep = page.getLEP();
                     MutableBoolean retry = new MutableBoolean(false);
                     do {
+                        retry.setValue(false);
                         int version = lep.getVersion();
 
                         MutableBoolean success = new MutableBoolean(true);
@@ -556,6 +584,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                                     } else {
                                         LOG.debug("localConsistencyCheck: concurrent modification, retrying");
                                         retry.setValue(true);
+                                        retryCounter.inc();
                                     }
                                     return false;
                                 } else {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index fd4f314..4fde8e7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
 import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
@@ -27,9 +29,11 @@ import io.netty.buffer.Unpooled;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -37,18 +41,37 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test for InterleavedLedgerStorage.
  */
+@RunWith(Parameterized.class)
 public class InterleavedLedgerStorageTest {
+    private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorageTest.class);
+
+    @Parameterized.Parameters
+    public static Iterable<Boolean> elplSetting() {
+        return Arrays.asList(true, false);
+    }
+
+    public InterleavedLedgerStorageTest(boolean elplSetting) {
+        conf.setEntryLogSizeLimit(2048);
+        conf.setEntryLogPerLedgerEnabled(elplSetting);
+    }
 
     CheckpointSource checkpointSource = new CheckpointSource() {
         @Override
@@ -73,8 +96,41 @@ public class InterleavedLedgerStorageTest {
         }
     };
 
+    static class TestableEntryLogger extends EntryLogger {
+        public interface CheckEntryListener {
+            void accept(long ledgerId,
+                        long entryId,
+                        long entryLogId,
+                        long pos);
+        }
+        volatile CheckEntryListener testPoint;
+
+        public TestableEntryLogger(
+                ServerConfiguration conf,
+                LedgerDirsManager ledgerDirsManager,
+                EntryLogListener listener,
+                StatsLogger statsLogger) throws IOException {
+            super(conf, ledgerDirsManager, listener, statsLogger);
+        }
+
+        void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedException {
+            this.testPoint = testPoint;
+        }
+
+        @Override
+        void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
+            CheckEntryListener runBefore = testPoint;
+            if (runBefore != null) {
+                runBefore.accept(ledgerId, entryId, logIdForOffset(location), posForOffset(location));
+            }
+            super.checkEntry(ledgerId, entryId, location);
+        }
+    }
+
+    TestStatsProvider statsProvider = new TestStatsProvider();
     ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
     LedgerDirsManager ledgerDirsManager;
+    TestableEntryLogger entryLogger;
     InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
     final long numWrites = 2000;
     final long entriesPerWrite = 2;
@@ -87,14 +143,16 @@ public class InterleavedLedgerStorageTest {
         File curDir = Bookie.getCurrentDirectory(tmpDir);
         Bookie.checkDirectoryStructure(curDir);
 
-        conf = TestBKConfiguration.newServerConfiguration();
-        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
-        conf.setEntryLogSizeLimit(2048);
+        conf.setLedgerDirNames(new String[]{tmpDir.toString()});
         ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+        entryLogger = new TestableEntryLogger(
+                conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
+        interleavedStorage.initializeWithEntryLogger(
+                conf, null, ledgerDirsManager, ledgerDirsManager,
+                null, checkpointSource, checkpointer, entryLogger,
+                statsProvider.getStatsLogger(BOOKIE_SCOPE));
 
         // Insert some ledger & entries in the interleaved storage
         for (long entryId = 0; entryId < numWrites; entryId++) {
@@ -132,41 +190,77 @@ public class InterleavedLedgerStorageTest {
     }
 
     @Test
-    public void testConsistencyCheckConcurrentModification() throws Exception {
-        AtomicBoolean done = new AtomicBoolean(false);
-        EntryLogger entryLogger = interleavedStorage.getEntryLogger();
-        List<Exception> asyncErrors = new ArrayList<>();
+    public void testConsistencyCheckConcurrentGC() throws Exception {
+        final long signalDone = -1;
+        final List<Exception> asyncErrors = new ArrayList<>();
+        final LinkedBlockingQueue<Long> toCompact = new LinkedBlockingQueue<>();
+        final Semaphore awaitingCompaction = new Semaphore(0);
+
+        interleavedStorage.flush();
+        final long lastLogId = entryLogger.getLeastUnflushedLogId();
+
+        final MutableInt counter = new MutableInt(0);
+        entryLogger.setCheckEntryTestPoint((ledgerId, entryId, entryLogId, pos) -> {
+            if (entryLogId < lastLogId) {
+                if (counter.intValue() % 100 == 0) {
+                    try {
+                        toCompact.put(entryLogId);
+                        awaitingCompaction.acquire();
+                    } catch (InterruptedException e) {
+                        asyncErrors.add(e);
+                    }
+                }
+                counter.increment();
+            }
+        });
+
         Thread mutator = new Thread(() -> {
             EntryLogCompactor compactor = new EntryLogCompactor(
                     conf,
                     entryLogger,
                     interleavedStorage,
                     entryLogger::removeEntryLog);
-            long next = 0;
-            while (!done.get()) {
+            while (true) {
+                Long next = null;
                 try {
+                    next = toCompact.take();
+                    if (next == null || next == signalDone) {
+                        break;
+                    }
                     compactor.compact(entryLogger.getEntryLogMetadata(next));
-                    next++;
-                } catch (IOException e) {
+                } catch (BufferedChannelBase.BufferedChannelClosedException e) {
+                    // next was already removed, ignore
+                } catch (Exception e) {
                     asyncErrors.add(e);
                     break;
+                } finally {
+                    if (next != null) {
+                        awaitingCompaction.release();
+                    }
                 }
             }
         });
         mutator.start();
 
-        for (int i = 0; i < 100; ++i) {
-            assert interleavedStorage.localConsistencyCheck(Optional.empty()).size() == 0;
-            Thread.sleep(10);
+        List<LedgerStorage.DetectedInconsistency> inconsistencies = interleavedStorage.localConsistencyCheck(
+                Optional.empty());
+        for (LedgerStorage.DetectedInconsistency e: inconsistencies) {
+            LOG.error("Found: {}", e);
         }
+        Assert.assertEquals(0, inconsistencies.size());
 
-        done.set(true);
+        toCompact.offer(signalDone);
         mutator.join();
         for (Exception e: asyncErrors) {
             throw e;
         }
-    }
 
+        if (!conf.isEntryLogPerLedgerEnabled()) {
+            Assert.assertNotEquals(
+                    0,
+                    statsProvider.getCounter(BOOKIE_SCOPE + "." + STORAGE_SCRUB_PAGE_RETRIES).get().longValue());
+        }
+    }
 
     @Test
     public void testConsistencyMissingEntry() throws Exception {