You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/25 09:11:56 UTC

[GitHub] sijie closed pull request #1360: Issue #1345: entrylogger.flush should flush currentlog first.

sijie closed pull request #1360: Issue #1345: entrylogger.flush should flush currentlog first.
URL: https://github.com/apache/bookkeeper/pull/1360
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 158525d5f..b1b04769c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -947,8 +947,8 @@ abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int
 
         @Override
         public void flush() throws IOException {
-            flushRotatedLogs();
             flushCurrentLogs();
+            flushRotatedLogs();
         }
 
         void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throws IOException {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 21da95118..d402a85f8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -38,12 +38,17 @@
 import java.io.RandomAccessFile;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
@@ -435,6 +440,120 @@ public void testGetEntryLogsSet() throws Exception {
         assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
     }
 
+    /**
+     * In this testcase, entryLogger flush and entryLogger addEntry (which would
+     * call createNewLog) are called concurrently. Since entryLogger flush
+     * method flushes both currentlog and rotatedlogs, it is expected all the
+     * currentLog and rotatedLogs are supposed to be flush and forcewritten.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFlushOrder() throws Exception {
+        entryLogger.shutdown();
+
+        int logSizeLimit = 256 * 1024;
+        conf.setEntryLogPerLedgerEnabled(false);
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setFlushIntervalInBytes(0);
+        conf.setEntryLogSizeLimit(logSizeLimit);
+
+        entryLogger = new EntryLogger(conf, dirsMgr);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        AtomicBoolean exceptionHappened = new AtomicBoolean(false);
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        List<BufferedLogChannel> rotatedLogChannels;
+        BufferedLogChannel currentActiveChannel;
+
+        exceptionHappened.set(false);
+
+        /*
+         * higher the number of rotated logs, it would be easier to reproduce
+         * the issue regarding flush order
+         */
+        addEntriesAndRotateLogs(entryLogger, 30);
+
+        rotatedLogChannels = new LinkedList<BufferedLogChannel>(entryLogManager.getRotatedLogChannels());
+        currentActiveChannel = entryLogManager.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID);
+        long currentActiveChannelUnpersistedBytes = currentActiveChannel.getUnpersistedBytes();
+
+        Thread flushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    barrier.await();
+                    entryLogger.flush();
+                } catch (InterruptedException | BrokenBarrierException | IOException e) {
+                    LOG.error("Exception happened for entryLogger.flush", e);
+                    exceptionHappened.set(true);
+                }
+            }
+        });
+
+        Thread createdNewLogThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    barrier.await();
+                    /*
+                     * here we are adding entry of size logSizeLimit with
+                     * rolllog=true, so it would create a new entrylog.
+                     */
+                    entryLogger.addEntry(123, generateEntry(123, 456, logSizeLimit), true);
+                } catch (InterruptedException | BrokenBarrierException | IOException e) {
+                    LOG.error("Exception happened for entryLogManager.createNewLog", e);
+                    exceptionHappened.set(true);
+                }
+            }
+        });
+
+        /*
+         * concurrently entryLogger flush and entryLogger addEntry (which would
+         * call createNewLog) would be called from different threads.
+         */
+        flushThread.start();
+        createdNewLogThread.start();
+        flushThread.join();
+        createdNewLogThread.join();
+
+        Assert.assertFalse("Exception happened in one of the operation", exceptionHappened.get());
+
+        /*
+         * if flush of the previous current channel is called then the
+         * unpersistedBytes should be less than what it was before, actually it
+         * would be close to zero (but when new log is created with addEntry
+         * call, ledgers map will be appended at the end of entry log)
+         */
+        Assert.assertTrue(
+                "previous currentChannel unpersistedBytes should be less than " + currentActiveChannelUnpersistedBytes
+                        + ", but it is actually " + currentActiveChannel.getUnpersistedBytes(),
+                currentActiveChannel.getUnpersistedBytes() < currentActiveChannelUnpersistedBytes);
+        for (BufferedLogChannel rotatedLogChannel : rotatedLogChannels) {
+            Assert.assertEquals("previous rotated entrylog should be flushandforcewritten", 0,
+                    rotatedLogChannel.getUnpersistedBytes());
+        }
+    }
+
+    void addEntriesAndRotateLogs(EntryLogger entryLogger, int numOfRotations)
+            throws IOException {
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null);
+        for (int i = 0; i < numOfRotations; i++) {
+            addEntries(entryLogger, 10);
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null);
+        }
+        addEntries(entryLogger, 10);
+    }
+
+    void addEntries(EntryLogger entryLogger, int noOfEntries) throws IOException {
+        for (int j = 0; j < noOfEntries; j++) {
+            int ledgerId = Math.abs(rand.nextInt());
+            int entryId = Math.abs(rand.nextInt());
+            entryLogger.addEntry(ledgerId, generateEntry(ledgerId, entryId).nioBuffer());
+        }
+    }
+
     static class LedgerStorageWriteTask implements Callable<Boolean> {
         long ledgerId;
         int entryId;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services