You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/23 17:28:29 UTC

[GitHub] reddycharan commented on a change in pull request #1360: Issue #1345: entrylogger.flush should flush currentlog first.

reddycharan commented on a change in pull request #1360: Issue #1345: entrylogger.flush should flush currentlog first.
URL: https://github.com/apache/bookkeeper/pull/1360#discussion_r183475935
 
 

 ##########
 File path: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 ##########
 @@ -435,6 +440,121 @@ public void testGetEntryLogsSet() throws Exception {
         assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
     }
 
+    /**
+     * In this testcase, entryLogger flush and entryLogger addEntry (which would
+     * call createNewLog) are called concurrently. Since entryLogger flush
+     * method flushes both currentlog and rotatedlogs, it is expected all the
+     * currentLog and rotatedLogs are supposed to be flush and forcewritten.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFlushOrder() throws Exception {
+        entryLogger.shutdown();
+
+        int logSizeLimit = 256 * 1024;
+        conf.setEntryLogPerLedgerEnabled(false);
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setFlushIntervalInBytes(0);
+        conf.setEntryLogSizeLimit(logSizeLimit);
+
+        entryLogger = new EntryLogger(conf, dirsMgr);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        AtomicBoolean exceptionHappened = new AtomicBoolean(false);
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        List<BufferedLogChannel> rotatedLogChannels;
+        BufferedLogChannel currentActiveChannel;
+
+        exceptionHappened.set(false);
+
+        /*
+         * higher the number of rotated logs, it would be easier to reproduce
+         * the issue regarding flush order
+         */
+        addEntriesAndRotateLogs(entryLogger, 30);
+
+        rotatedLogChannels = new LinkedList<BufferedLogChannel>(entryLogManager.getRotatedLogChannels());
+        currentActiveChannel = entryLogManager.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID);
+        long currentActiveChannelUnpersistedBytes = currentActiveChannel.getUnpersistedBytes();
+
+        Thread flushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    barrier.await();
+                    entryLogger.flush();
+                } catch (InterruptedException | BrokenBarrierException | IOException e) {
+                    LOG.error("Exception happened for entryLogger.flush", e);
+                    exceptionHappened.set(true);
+                }
+            }
+        });
+
+        Thread createdNewLogThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    barrier.await();
+                    /*
+                     * here we are adding entry of size logSizeLimit with
+                     * rolllog=true, so it would create a new entrylog.
+                     */
+                    entryLogger.addEntry(123, generateEntry(123, 456, logSizeLimit), true);
+                } catch (InterruptedException | BrokenBarrierException | IOException e) {
+                    LOG.error("Exception happened for entryLogManager.createNewLog", e);
+                    exceptionHappened.set(true);
+                }
+            }
+        });
+
+        /*
+         * concurrently entryLogger flush and entryLogger addEntry (which would
+         * call createNewLog) would be called from different threads.
+         */
+        flushThread.start();
+        createdNewLogThread.start();
+        flushThread.join();
+        createdNewLogThread.join();
+
+        Assert.assertFalse("Exception happened in one of the operation", exceptionHappened.get());
+
+        /*
+         * if flush of the previous current channel is called then the
+         * unpersistedBytes should be less than what it was before, actually it
+         * would be close to zero (but when new log is created with addEntry
+         * call, ledgers map will be appended at the end of entry log)
+         */
+        Assert.assertTrue(
+                "previous currentChannel unpersistedBytes should be less than " + currentActiveChannelUnpersistedBytes
+                        + ", but it is actually " + currentActiveChannel.getUnpersistedBytes(),
+                currentActiveChannel.getUnpersistedBytes() < currentActiveChannelUnpersistedBytes);
+        for (BufferedLogChannel rotatedLogChannel : rotatedLogChannels) {
+            Assert.assertEquals("previous rotated entrylog should be flushandforcewritten", 0,
+                    rotatedLogChannel.getUnpersistedBytes());
+        }
+    }
+
+    void addEntriesAndRotateLogs(EntryLogger entryLogger, int numOfRotations)
+            throws IOException {
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null);
+        Random rand = new Random();
 
 Review comment:
   yes, removed that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services