You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/15 07:36:31 UTC

[bookkeeper] branch master updated: Provide consistent locking mechanism in EntryLogManagerForEntryLogPerLedger

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b0c04df  Provide consistent locking mechanism in EntryLogManagerForEntryLogPerLedger
b0c04df is described below

commit b0c04dffdf0c6f243ef9f17326c9e5ad83d20544
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Wed Aug 15 00:36:23 2018 -0700

    Provide consistent locking mechanism in EntryLogManagerForEntryLogPerLedger
    
    Descriptions of the changes in this PR:
    
    Assumption: The lock stored alongside the EntryLog in this map is meant
    to be used to ensure that no two threads can be writing to the same
    ledger at the same time.
    
    The above invariant can be violated if the EntryLogAndLockTuple
    object is evicted from the cache while in a critical section nominally
    protected by the contained lock.
    
    The conditions required for this to happen would be pretty odd --
    there needs to be a huge amount of cache churn during one of
    the protected operations.
    
    The fix for this issue is to allocate in the constructor a fixed array of locks and select
    for each EntryLogAndLockTuple a lock from that array
    deterministically by ledgerId such that the same ledgerId
    will always get the same lock.
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1600 from reddycharan/elmlock
---
 .../EntryLogManagerForEntryLogPerLedger.java       |  15 ++-
 .../apache/bookkeeper/bookie/CreateNewLogTest.java | 117 +++++++++++++++++++++
 .../org/apache/bookkeeper/bookie/EntryLogTest.java |  13 ++-
 3 files changed, 138 insertions(+), 7 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index 41e66d9..96f38c5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -49,6 +49,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -87,12 +88,16 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
         }
     }
 
-    static class EntryLogAndLockTuple {
+    class EntryLogAndLockTuple {
         private final Lock ledgerLock;
         private BufferedLogChannelWithDirInfo entryLogWithDirInfo;
 
-        private EntryLogAndLockTuple() {
-            ledgerLock = new ReentrantLock();
+        private EntryLogAndLockTuple(long ledgerId) {
+            int lockIndex = Long.hashCode(ledgerId) % lockArrayPool.length();
+            if (lockArrayPool.get(lockIndex) == null) {
+                lockArrayPool.compareAndSet(lockIndex, null, new ReentrantLock());
+            }
+            ledgerLock = lockArrayPool.get(lockIndex);
         }
 
         private Lock getLedgerLock() {
@@ -212,6 +217,7 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
         }
     }
 
+    private final AtomicReferenceArray<Lock> lockArrayPool;
     private final LoadingCache<Long, EntryLogAndLockTuple> ledgerIdEntryLogMap;
     /*
      * every time active logChannel is accessed from ledgerIdEntryLogMap
@@ -244,10 +250,11 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
         this.entryLogPerLedgerCounterLimitsMultFactor = conf.getEntryLogPerLedgerCounterLimitsMultFactor();
 
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        this.lockArrayPool = new AtomicReferenceArray<Lock>(maximumNumberOfActiveEntryLogs * 2);
         this.entryLogAndLockTupleCacheLoader = new CacheLoader<Long, EntryLogAndLockTuple>() {
             @Override
             public EntryLogAndLockTuple load(Long key) throws Exception {
-                return new EntryLogAndLockTuple();
+                return new EntryLogAndLockTuple(key);
             }
         };
         /*
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 3cdfd9f..f5d4edc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -34,10 +34,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
 import java.util.stream.IntStream;
 
 import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.Counter;
@@ -769,4 +771,119 @@ public class CreateNewLogTest {
             entrylogManager.createNewLog(ledgerId);
         }
     }
+
+    @Test
+    public void testLockConsistency() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setMaximumNumberOfActiveEntryLogs(5);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger count = new AtomicInteger(0);
+
+        /*
+         * Inject wait operation in 'getWritableLedgerDirsForNewLog' method of
+         * ledgerDirsManager. getWritableLedgerDirsForNewLog will be called when
+         * entryLogManager.createNewLog is called.
+         */
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())) {
+            /*
+             * getWritableLedgerDirsForNewLog is called for the first time, it
+             * will await on 'latch' latch before calling super
+             * getWritableLedgerDirsForNewLog.
+             */
+            @Override
+            public List<File> getWritableLedgerDirsForNewLog() throws NoWritableLedgerDirException {
+                if (count.incrementAndGet() == 1) {
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        LOG.error("Got InterruptedException while awaiting for latch countdown", e);
+                    }
+                }
+                return super.getWritableLedgerDirsForNewLog();
+            }
+        };
+
+        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) el
+                .getEntryLogManager();
+
+        long firstLedgerId = 100L;
+        AtomicBoolean newLogCreated = new AtomicBoolean(false);
+
+        Assert.assertFalse("EntryLogManager cacheMap should not contain entry for firstLedgerId",
+                entryLogManager.getCacheAsMap().containsKey(firstLedgerId));
+        Assert.assertEquals("Value of the count should be 0", 0, count.get());
+        /*
+         * In a new thread, create newlog for 'firstLedgerId' and then set
+         * 'newLogCreated' to true. Since this is the first createNewLog call,
+         * it is going to be blocked untill latch is countdowned to 0.
+         */
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    entryLogManager.createNewLog(firstLedgerId);
+                    newLogCreated.set(true);
+                } catch (IOException e) {
+                    LOG.error("Got IOException while creating new log", e);
+                }
+            }
+        }.start();
+
+        /*
+         * Wait until entry for 'firstLedgerId' is created in cacheMap. It will
+         * be created because in the other thread createNewLog is called.
+         */
+        while (!entryLogManager.getCacheAsMap().containsKey(firstLedgerId)) {
+            Thread.sleep(200);
+        }
+        Lock firstLedgersLock = entryLogManager.getLock(firstLedgerId);
+
+        /*
+         * since 'latch' is not counteddown, newlog should not be created even
+         * after waitign for 2 secs.
+         */
+        Thread.sleep(2000);
+        Assert.assertFalse("New log shouldn't have created", newLogCreated.get());
+
+        /*
+         * create MaximumNumberOfActiveEntryLogs of entrylogs and do cache
+         * cleanup, so that the earliest entry from cache will be removed.
+         */
+        for (int i = 1; i <= conf.getMaximumNumberOfActiveEntryLogs(); i++) {
+            entryLogManager.createNewLog(firstLedgerId + i);
+        }
+        entryLogManager.doEntryLogMapCleanup();
+        Assert.assertFalse("Entry for that ledger shouldn't be there",
+                entryLogManager.getCacheAsMap().containsKey(firstLedgerId));
+
+        /*
+         * now countdown the latch, so that the other thread can make progress
+         * with createNewLog and since this entry is evicted from cache,
+         * entrylog of the newly created entrylog will be added to
+         * rotatedentrylogs.
+         */
+        latch.countDown();
+        while (!newLogCreated.get()) {
+            Thread.sleep(200);
+        }
+        while (entryLogManager.getRotatedLogChannels().size() < 1) {
+            Thread.sleep(200);
+        }
+
+        /*
+         * Entry for 'firstLedgerId' is removed from cache, but even in this
+         * case when we get lock for the 'firstLedgerId' it should be the same
+         * as we got earlier.
+         */
+        Lock lockForThatLedgerAfterRemoval = entryLogManager.getLock(firstLedgerId);
+        Assert.assertEquals("For a given ledger lock should be the same before and after removal", firstLedgersLock,
+                lockForThatLedgerAfterRemoval);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 1dc535c..4b52a09 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -1003,10 +1003,17 @@ public class EntryLogTest {
         latchToStart.countDown();
         Thread.sleep(1000);
         /*
-         * since there are only "numOfLedgers" ledgers, only "numOfLedgers" threads should have been able to acquire
-         * lock. After acquiring the lock there must be waiting on 'latchToWait' latch
+         * since there are only "numOfLedgers" ledgers, only < "numOfLedgers"
+         * threads should have been able to acquire lock, because multiple
+         * ledgers can end up getting same lock because their hashcode might
+         * fall in the same bucket.
+         *
+         *
+         * After acquiring the lock there must be waiting on 'latchToWait' latch
          */
-        assertEquals("Number Of Threads acquired Lock", numOfLedgers, numberOfThreadsAcquiredLock.get());
+        int currentNumberOfThreadsAcquiredLock = numberOfThreadsAcquiredLock.get();
+        assertTrue("Number Of Threads acquired Lock " + currentNumberOfThreadsAcquiredLock,
+                (currentNumberOfThreadsAcquiredLock > 0) && (currentNumberOfThreadsAcquiredLock <= numOfLedgers));
         latchToWait.countDown();
         Thread.sleep(2000);
         assertEquals("Number Of Threads acquired Lock", numOfLedgers * numOfThreadsPerLedger,