You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/09 06:16:55 UTC

[GitHub] sijie closed pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage

sijie closed pull request #1225: Issue #570: getting rid of unnecessary synchronization in InterleavedLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1225
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index fdfef0ab7..81de730e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -34,6 +34,8 @@
 import java.util.NavigableMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -74,7 +76,7 @@
     GarbageCollectorThread gcThread;
 
     // this indicates that a write has happened since the last flush
-    private volatile boolean somethingWritten = false;
+    private final AtomicBoolean somethingWritten = new AtomicBoolean(false);
 
     // Expose Stats
     private OpStatsLogger getOffsetStats;
@@ -264,7 +266,7 @@ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
 
 
     @Override
-    public synchronized long addEntry(ByteBuf entry) throws IOException {
+    public long addEntry(ByteBuf entry) throws IOException {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         long lac = entry.getLong(entry.readerIndex() + 16);
@@ -360,10 +362,9 @@ public void checkpoint(Checkpoint checkpoint) throws IOException {
 
     @Override
     public synchronized void flush() throws IOException {
-        if (!somethingWritten) {
+        if (!somethingWritten.compareAndSet(true, false)) {
             return;
         }
-        somethingWritten = false;
         flushOrCheckpoint(false);
     }
 
@@ -419,12 +420,12 @@ protected void processEntry(long ledgerId, long entryId, ByteBuf entry) throws I
         processEntry(ledgerId, entryId, entry, true);
     }
 
-    protected synchronized void processEntry(long ledgerId, long entryId, ByteBuf entry, boolean rollLog)
+    protected void processEntry(long ledgerId, long entryId, ByteBuf entry, boolean rollLog)
             throws IOException {
         /*
          * Touch dirty flag
          */
-        somethingWritten = true;
+        somethingWritten.set(true);
 
         /*
          * Log the entry
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 8568617e0..80d3a6526 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -94,7 +94,7 @@ LedgerUnderreplicationManager newLedgerUnderreplicationManager()
      * @param lm
      *            Layout manager
      */
-    void format(final AbstractConfiguration<?> conf, final LayoutManager lm)
+    void format(AbstractConfiguration<?> conf, LayoutManager lm)
             throws InterruptedException, KeeperException, IOException;
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index c60a997a2..d4a98cee0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -36,8 +36,13 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -106,8 +111,8 @@ public void testCorruptEntryLog() throws Exception {
         assertTrue(meta.getLedgersMap().containsKey(3L));
     }
 
-    private ByteBuf generateEntry(long ledger, long entry) {
-        byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+    private static ByteBuf generateEntry(long ledger, long entry) {
+        byte[] data = generateDataString(ledger, entry).getBytes();
         ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
         bb.writeLong(ledger);
         bb.writeLong(entry);
@@ -115,6 +120,10 @@ private ByteBuf generateEntry(long ledger, long entry) {
         return bb;
     }
 
+    private static String generateDataString(long ledger, long entry) {
+        return ("ledger-" + ledger + "-" + entry);
+    }
+
     @Test
     public void testMissingLogId() throws Exception {
         File tmpDir = createTempDir("entryLogTest", ".dir");
@@ -391,4 +400,160 @@ public void testGetEntryLogsSet() throws Exception {
 
         assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet());
     }
+
+    static class LedgerStorageWriteTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        LedgerStorage ledgerStorage;
+
+        LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.ledgerStorage = ledgerStorage;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+            } catch (IOException e) {
+                LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e);
+                throw new IOException("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId,
+                        e);
+            }
+            return true;
+        }
+    }
+
+    static class LedgerStorageFlushTask implements Callable<Boolean> {
+        LedgerStorage ledgerStorage;
+
+        LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+            this.ledgerStorage = ledgerStorage;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ledgerStorage.flush();
+            } catch (IOException e) {
+                LOG.error("Got Exception for flush call", e);
+                throw new IOException("Got Exception for Flush call", e);
+            }
+            return true;
+        }
+    }
+
+    static class LedgerStorageReadTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        LedgerStorage ledgerStorage;
+
+        LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage ledgerStorage) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.ledgerStorage = ledgerStorage;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+                ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, entryId);
+                if (!expectedByteBuf.equals(actualByteBuf)) {
+                    LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()),
+                            actualByteBuf.toString(Charset.defaultCharset()));
+                    throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset())
+                            + " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset()));
+                }
+            } catch (IOException e) {
+                LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e);
+                throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId,
+                        e);
+            }
+            return true;
+        }
+    }
+
+    /**
+     * test concurrent write operations and then concurrent read
+     * operations using InterleavedLedgerStorage.
+     */
+    @Test
+    public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception {
+        File ledgerDir = createTempDir("bkTest", ".dir");
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(ledgerDir.toString());
+        conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+        Bookie bookie = new Bookie(conf);
+        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        Random rand = new Random(0);
+
+        int numOfLedgers = 70;
+        int numEntries = 1500;
+        // Create ledgers
+        for (int i = 0; i < numOfLedgers; i++) {
+            ledgerStorage.setMasterKey(i, "key".getBytes());
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        List<Callable<Boolean>> writeAndFlushTasks = new ArrayList<Callable<Boolean>>();
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfLedgers; i++) {
+                writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, ledgerStorage));
+            }
+        }
+
+        /*
+         * add some flush tasks to the list of writetasks list.
+         */
+        for (int i = 0; i < (numOfLedgers * numEntries) / 500; i++) {
+            writeAndFlushTasks.add(rand.nextInt(writeAndFlushTasks.size()), new LedgerStorageFlushTask(ledgerStorage));
+        }
+
+        // invoke all those write/flush tasks all at once concurrently
+        executor.invokeAll(writeAndFlushTasks).forEach((future) -> {
+            try {
+                future.get();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Write/Flush task failed because of InterruptedException", ie);
+                Assert.fail("Write/Flush task interrupted");
+            } catch (Exception ex) {
+                LOG.error("Write/Flush task failed because of  exception", ex);
+                Assert.fail("Write/Flush task failed " + ex.getMessage());
+            }
+        });
+
+        List<Callable<Boolean>> readAndFlushTasks = new ArrayList<Callable<Boolean>>();
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfLedgers; i++) {
+                readAndFlushTasks.add(new LedgerStorageReadTask(i, j, ledgerStorage));
+            }
+        }
+
+        /*
+         * add some flush tasks to the list of readtasks list.
+         */
+        for (int i = 0; i < (numOfLedgers * numEntries) / 500; i++) {
+            readAndFlushTasks.add(rand.nextInt(readAndFlushTasks.size()), new LedgerStorageFlushTask(ledgerStorage));
+        }
+
+        // invoke all those read/flush tasks all at once concurrently
+        executor.invokeAll(readAndFlushTasks).forEach((future) -> {
+            try {
+                future.get();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Read/Flush task failed because of InterruptedException", ie);
+                Assert.fail("Read/Flush task interrupted");
+            } catch (Exception ex) {
+                LOG.error("Read/Flush task failed because of  exception", ex);
+                Assert.fail("Read/Flush task failed " + ex.getMessage());
+            }
+        });
+
+        executor.shutdownNow();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services