You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/01 19:21:34 UTC

[GitHub] sijie closed pull request #889: BP-14 Implementation of WriteFlag.DEFERRED_SYNC on Journal

sijie closed pull request #889: BP-14 Implementation of WriteFlag.DEFERRED_SYNC on Journal
URL: https://github.com/apache/bookkeeper/pull/889
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index da0525acf..e091199b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -939,7 +939,7 @@ public void run() {
             // wait until journal quits
             for (Journal journal: journals) {
 
-                journal.join();
+                journal.joinThread();
             }
             LOG.info("Journal thread(s) quit.");
         } catch (InterruptedException ie) {
@@ -1070,7 +1070,7 @@ private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey
                 bb.put(masterKey);
                 bb.flip();
 
-                getJournal(ledgerId).logAddEntry(bb, new NopWriteCallback(), null);
+                getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync */, new NopWriteCallback(), null);
             }
         }
 
@@ -1084,7 +1084,8 @@ private Journal getJournal(long ledgerId) {
     /**
      * Add an entry to a ledger as specified by handle.
      */
-    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, WriteCallback cb, Object ctx)
+    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
+                                  boolean ackBeforeSync, WriteCallback cb, Object ctx)
             throws IOException, BookieException {
         long ledgerId = handle.getLedgerId();
         long entryId = handle.addEntry(entry);
@@ -1094,7 +1095,7 @@ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, WriteCallb
         if (LOG.isTraceEnabled()) {
             LOG.trace("Adding {}@{}", entryId, ledgerId);
         }
-        getJournal(ledgerId).logAddEntry(entry, cb, ctx);
+        getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
     }
 
     /**
@@ -1112,7 +1113,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[]
             LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
             synchronized (handle) {
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1159,7 +1160,7 @@ public ByteBuf getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedger
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
-    public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
+    public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException.LedgerFencedException, BookieException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
@@ -1172,7 +1173,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterK
                             .create(BookieException.Code.LedgerFencedException);
                 }
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, ackBeforeSync, cb, ctx);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1403,7 +1404,7 @@ public static void main(String[] args)
             buff.writeLong(1);
             buff.writeLong(i);
             cb.incCount();
-            b.addEntry(buff, cb, null, new byte[0]);
+            b.addEntry(buff, false /* ackBeforeSync */, cb, null, new byte[0]);
         }
         cb.waitZero();
         long end = MathUtils.now();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 29373d2d8..a2acf76a1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -21,6 +21,7 @@
 
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 
 import io.netty.buffer.ByteBuf;
@@ -283,17 +284,18 @@ public boolean accept(long journalId) {
         WriteCallback cb;
         Object ctx;
         long enqueueTime;
+        boolean ackBeforeSync;
 
         OpStatsLogger journalAddEntryStats;
         Counter journalCbQueueSize;
 
-        static QueueEntry create(
-                ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
-                long enqueueTime,
-                OpStatsLogger journalAddEntryStats,
+
+        static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId,
+                WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats,
                 Counter journalCbQueueSize) {
             QueueEntry qe = RECYCLER.get();
             qe.entry = entry;
+            qe.ackBeforeSync = ackBeforeSync;
             qe.cb = cb;
             qe.ctx = ctx;
             qe.ledgerId = ledgerId;
@@ -332,7 +334,11 @@ private void recycle() {
         }
     }
 
-    private class ForceWriteRequest {
+    /**
+     * Token which represents the need to force a write to the Journal.
+     */
+    @VisibleForTesting
+    public class ForceWriteRequest {
         private JournalChannel logFile;
         private RecyclableArrayList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
@@ -352,11 +358,16 @@ public int process(boolean shouldForceWrite) throws IOException {
                     this.logFile.forceWrite(false);
                     journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                 }
+                LOG.info("setCurLogMark at {}", lastLogMark.getCurMark());
+                LOG.info("setCurLogMark at {}, {}", this.logId, this.lastFlushedPosition);
                 lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
 
                 // Notify the waiters that the force write succeeded
                 for (int i = 0; i < forceWriteWaiters.size(); i++) {
-                    cbThreadPool.execute(forceWriteWaiters.get(i));
+                    QueueEntry qe = forceWriteWaiters.get(i);
+                    if (qe != null) {
+                        cbThreadPool.execute(qe);
+                    }
                     journalCbQueueSize.inc();
                 }
 
@@ -819,22 +830,28 @@ public boolean accept(long journalId) {
         }
     }
 
-    public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
-        logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx);
+    public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, WriteCallback cb, Object ctx) {
+        logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
     }
 
     /**
      * record an add entry operation in journal.
      */
-    public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) {
+    public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx) {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         journalQueueSize.inc();
+        logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
+    }
 
+    @VisibleForTesting
+    void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
+                     boolean ackBeforeSync, WriteCallback cb, Object ctx) {
         //Retain entry until it gets written to journal
         entry.retain();
+
         queue.add(QueueEntry.create(
-                entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
+                entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
                 journalAddEntryStats, journalQueueSize));
     }
 
@@ -867,6 +884,7 @@ public void run() {
         LOG.info("Starting journal on {}", journalDirectory);
 
         RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
+        int numEntriesToFlush = 0;
         ByteBuf lenBuff = Unpooled.buffer(4);
         ByteBuf paddingBuff = Unpooled.buffer(2 * conf.getJournalAlignmentSize());
         paddingBuff.writeZero(paddingBuff.capacity());
@@ -912,7 +930,7 @@ public void run() {
                                 TimeUnit.NANOSECONDS);
                     }
 
-                    if (toFlush.isEmpty()) {
+                    if (numEntriesToFlush == 0) {
                         qe = queue.take();
                         dequeueStartTime = MathUtils.nowInNano();
                         journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
@@ -970,6 +988,16 @@ public void run() {
                             }
                             journalFlushWatcher.reset().start();
                             bc.flush(false);
+
+                            for (int i = 0; i < toFlush.size(); i++) {
+                                QueueEntry entry = toFlush.get(i);
+                                if (entry != null && (!syncData || entry.ackBeforeSync)) {
+                                    toFlush.set(i, null);
+                                    numEntriesToFlush--;
+                                    cbThreadPool.execute(entry);
+                                }
+                            }
+
                             lastFlushPosition = bc.position();
                             journalFlushStats.registerSuccessfulEvent(
                                     journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
@@ -977,12 +1005,14 @@ public void run() {
                             // Trace the lifetime of entries through persistence
                             if (LOG.isDebugEnabled()) {
                                 for (QueueEntry e : toFlush) {
-                                    LOG.debug("Written and queuing for flush Ledger: {}  Entry: {}",
-                                              e.ledgerId, e.entryId);
+                                    if (e != null) {
+                                        LOG.debug("Written and queuing for flush Ledger: {}  Entry: {}",
+                                                  e.ledgerId, e.entryId);
+                                    }
                                 }
                             }
 
-                            forceWriteBatchEntriesStats.registerSuccessfulValue(toFlush.size());
+                            forceWriteBatchEntriesStats.registerSuccessfulValue(numEntriesToFlush);
                             forceWriteBatchBytesStats.registerSuccessfulValue(batchSize);
 
                             boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
@@ -992,14 +1022,12 @@ public void run() {
                                 forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
                                                                                toFlush, shouldRolloverJournal, false));
                                 toFlush = entryListRecycler.newInstance();
+                                numEntriesToFlush = 0;
                             } else {
                                 // Data is already written on the file (though it might still be in the OS page-cache)
                                 lastLogMark.setCurLogMark(logId, lastFlushPosition);
-                                for (int i = 0; i < toFlush.size(); i++) {
-                                    cbThreadPool.execute(toFlush.get(i));
-                                }
-
                                 toFlush.clear();
+                                numEntriesToFlush = 0;
                                 if (shouldRolloverJournal) {
                                     forceWriteRequests.put(
                                             createForceWriteRequest(
@@ -1044,6 +1072,7 @@ public void run() {
                 qe.entry.release();
 
                 toFlush.add(qe);
+                numEntriesToFlush++;
                 qe = null;
             }
             logFile.close();
@@ -1099,4 +1128,16 @@ private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException
         }
         return total;
     }
+
+    //
+    /**
+     * Wait for the Journal thread to exit.
+     * This is method is needed in order to mock the journal, we can't mock final method of java.lang.Thread class
+     *
+     * @throws InterruptedException
+     */
+    @VisibleForTesting
+    public void joinThread() throws InterruptedException {
+        join();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 0649bc56a..84730cb76 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -122,7 +122,7 @@ ByteBuf getExplicitLac() {
             result = logFenceResult = SettableFuture.create();
         }
         ByteBuf entry = createLedgerFenceEntry(ledgerId);
-        journal.logAddEntry(entry, (rc, ledgerId, entryId, addr, ctx) -> {
+        journal.logAddEntry(entry, false /* ackBeforeSync */, (rc, ledgerId, entryId, addr, ctx) -> {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Record fenced state for ledger {} in journal with rc {}",
                         ledgerId, BKException.codeLogger(rc));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index db2cd5ec7..34e2c2c3f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -77,7 +77,7 @@ protected void processPacket() {
             if (add.isRecoveryAdd()) {
                 requestProcessor.bookie.recoveryAddEntry(addData, this, channel, add.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(addData, this, channel, add.getMasterKey());
+                requestProcessor.bookie.addEntry(addData, false, this, channel, add.getMasterKey());
             }
         } catch (IOException e) {
             LOG.error("Error writing " + add, e);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 2cb1238d8..f227d8828 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -25,9 +25,11 @@
 import io.netty.channel.Channel;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
@@ -101,6 +103,13 @@ public void writeComplete(int rc, long ledgerId, long entryId,
                 sendResponse(status, resp, requestProcessor.addRequestStats);
             }
         };
+        final EnumSet<WriteFlag> writeFlags;
+        if (addRequest.hasWriteFlags()) {
+            writeFlags = WriteFlag.getWriteFlags(addRequest.getWriteFlags());
+        } else {
+            writeFlags = EnumSet.noneOf(WriteFlag.class);
+        }
+        final boolean ackBeforeSync = writeFlags.contains(WriteFlag.DEFERRED_SYNC);
         StatusCode status = null;
         byte[] masterKey = addRequest.getMasterKey().toByteArray();
         ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
@@ -108,7 +117,7 @@ public void writeComplete(int rc, long ledgerId, long entryId,
             if (addRequest.hasFlag() && addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
                 requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
             } else {
-                requestProcessor.bookie.addEntry(entryToAdd, wcb, channel, masterKey);
+                requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey);
             }
             status = StatusCode.EOK;
         } catch (IOException e) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
new file mode 100644
index 000000000..a55bef77a
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -0,0 +1,317 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Test the bookie journal.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JournalChannel.class, Journal.class})
+@Slf4j
+public class BookieJournalForceTest {
+
+    private static final ByteBuf DATA = Unpooled.wrappedBuffer(new byte[]{});
+
+    @Rule
+    public TemporaryFolder tempDir = new TemporaryFolder();
+
+    @Test
+    public void testAckAfterSync() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+                enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        long ledgerId = 1;
+        long entryId = 0;
+        journal.logAddEntry(ledgerId, entryId, DATA, false /* ackBeforeSync */, new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+
+        // logAddEntry should not complete even if ForceWriteThread is suspended
+        // wait that an entry is written to the ForceWriteThread queue
+        while (supportQueue.isEmpty()) {
+            Thread.sleep(100);
+        }
+        assertEquals(1, latch.getCount());
+        assertEquals(1, supportQueue.size());
+
+        // in constructor of JournalChannel we are calling forceWrite(true) but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // let ForceWriteThread work
+        forceWriteThreadSuspendedLatch.countDown();
+
+        // callback should complete now
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        assertEquals(0, supportQueue.size());
+
+        // verify that log marker advanced
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite) > 0);
+
+        journal.shutdown();
+    }
+
+    @Test
+    public void testAckBeforeSync() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        long ledgerId = 1;
+        long entryId = 0;
+        journal.logAddEntry(ledgerId, entryId, DATA, true /* ackBeforeSync */, new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+        // logAddEntry should complete even if ForceWriteThread is suspended
+        latch.await(20, TimeUnit.SECONDS);
+
+        // in constructor of JournalChannel we are calling forceWrite(true) but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // we are never calling forceWrite
+        verify(jc, times(0)).forceWrite(false);
+
+        // verify that log marker did not advance
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        assertEquals(0, lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite));
+
+        // let the forceWriteThread exit
+        forceWriteThreadSuspendedLatch.countDown();
+
+        journal.shutdown();
+    }
+
+    @Test
+    public void testAckBeforeSyncWithJournalBufferedEntriesThreshold() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        final int journalBufferedEntriesThreshold = 10;
+        // sending a burst of entries, more than journalBufferedEntriesThreshold
+        final int numEntries = journalBufferedEntriesThreshold + 50;
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setJournalBufferedEntriesThreshold(journalBufferedEntriesThreshold)
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+
+        TestStatsProvider testStatsProvider = new TestStatsProvider();
+        Counter flushMaxOutstandingBytesCounter = testStatsProvider.getStatsLogger("test")
+                                                        .getCounter("flushMaxOutstandingBytesCounter");
+        Whitebox.setInternalState(journal, "flushMaxOutstandingBytesCounter", flushMaxOutstandingBytesCounter);
+
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(numEntries);
+        long ledgerId = 1;
+        for (long entryId = 0; entryId < numEntries; entryId++) {
+            journal.logAddEntry(ledgerId, entryId, DATA, true /* ackBeforeSync */, new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        // logAddEntry should complete even if ForceWriteThread is suspended
+        latch.await(20, TimeUnit.SECONDS);
+
+        // in constructor of JournalChannel we are calling forceWrite(true) but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // anyway we are never calling forceWrite
+        verify(jc, times(0)).forceWrite(false);
+
+        // verify that log marker did not advance
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        assertEquals(0, lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite));
+
+        // let the forceWriteThread exit
+        forceWriteThreadSuspendedLatch.countDown();
+
+        assertTrue(flushMaxOutstandingBytesCounter.get() > 1);
+        journal.shutdown();
+    }
+
+    @Test
+    public void testInterleavedRequests() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        journal.start();
+
+        final int numEntries = 100;
+        CountDownLatch latchAckBeforeSynch = new CountDownLatch(numEntries);
+        CountDownLatch latchAckAfterSynch = new CountDownLatch(numEntries);
+
+        long ledgerIdAckBeforeSync = 1;
+        long ledgerIdAckAfterSync = 2;
+        for (long entryId = 0; entryId < numEntries; entryId++) {
+            journal.logAddEntry(ledgerIdAckBeforeSync, entryId, DATA, true, new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                    latchAckBeforeSynch.countDown();
+                }
+            }, null);
+            journal.logAddEntry(ledgerIdAckAfterSync, entryId, DATA, false, new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                    latchAckAfterSynch.countDown();
+                }
+            }, null);
+        }
+        assertTrue(latchAckBeforeSynch.await(20, TimeUnit.SECONDS));
+        assertTrue(latchAckAfterSynch.await(20, TimeUnit.SECONDS));
+
+        // in constructor of JournalChannel we are calling forceWrite(true) but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        journal.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    private LinkedBlockingQueue<ForceWriteRequest> enableForceWriteThreadSuspension(
+        CountDownLatch forceWriteThreadSuspendedLatch,
+        Journal journal) throws InterruptedException {
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue = new LinkedBlockingQueue<>();
+        BlockingQueue<ForceWriteRequest> forceWriteRequests = mock(BlockingQueue.class);
+        doAnswer((Answer) (InvocationOnMock iom) -> {
+            supportQueue.put(iom.getArgument(0));
+            return null;
+        }).when(forceWriteRequests).put(any(ForceWriteRequest.class));
+        when(forceWriteRequests.take()).thenAnswer(i -> {
+            // suspend the force write thread
+            forceWriteThreadSuspendedLatch.await();
+            return supportQueue.take();
+        });
+        Whitebox.setInternalState(journal, "forceWriteRequests", forceWriteRequests);
+        return supportQueue;
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
new file mode 100644
index 000000000..74e8447c9
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test the bookie journal.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Bookie.class})
+@Slf4j
+public class BookieWriteToJournalTest {
+
+    @Rule
+    public TemporaryFolder tempDir = new TemporaryFolder();
+
+    /**
+     * test that Bookie calls correctly Journal.logAddEntry about "ackBeforeSync" parameter.
+     */
+    @Test
+    public void testJournalLogAddEntryCalledCorrectly() throws Exception {
+
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+        File ledgerDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+            .setZkServers(null);
+        BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
+        CountDownLatch journalJoinLatch = new CountDownLatch(1);
+        Journal journal = mock(Journal.class);
+        MutableBoolean effectiveAckBeforeSync = new MutableBoolean(false);
+        doAnswer((Answer) (InvocationOnMock iom) -> {
+            ByteBuf entry = iom.getArgument(0);
+            long ledgerId = entry.getLong(entry.readerIndex() + 0);
+            long entryId = entry.getLong(entry.readerIndex() + 8);
+            boolean ackBeforeSync = iom.getArgument(1);
+            WriteCallback callback = iom.getArgument(2);
+            Object ctx = iom.getArgument(3);
+
+            effectiveAckBeforeSync.setValue(ackBeforeSync);
+            callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieAddress, ctx);
+            return null;
+        }).when(journal).logAddEntry(any(ByteBuf.class), anyBoolean(), any(WriteCallback.class), any());
+
+        // bookie will continue to work as soon as the journal thread is alive
+        doAnswer((Answer) (InvocationOnMock iom) -> {
+            journalJoinLatch.await();
+            return null;
+        }).when(journal).joinThread();
+
+        whenNew(Journal.class).withAnyArguments().thenReturn(journal);
+
+        Bookie b = new Bookie(conf);
+        b.start();
+
+        long ledgerId = 1;
+        long entryId = 0;
+        Object expectedCtx = "foo";
+        byte[] masterKey = new byte[64];
+        for (boolean ackBeforeSync : new boolean[]{true, false}) {
+            CountDownLatch latch = new CountDownLatch(1);
+            final ByteBuf data = Unpooled.buffer();
+            data.writeLong(ledgerId);
+            data.writeLong(entryId);
+            final long expectedEntryId = entryId;
+            b.addEntry(data, ackBeforeSync, (int rc, long ledgerId1, long entryId1,
+                                             BookieSocketAddress addr, Object ctx) -> {
+                assertSame(expectedCtx, ctx);
+                assertEquals(ledgerId, ledgerId1);
+                assertEquals(expectedEntryId, entryId1);
+                latch.countDown();
+            }, expectedCtx, masterKey);
+            latch.await(30, TimeUnit.SECONDS);
+            assertEquals(ackBeforeSync, effectiveAckBeforeSync.booleanValue());
+            entryId++;
+        }
+        // let bookie exit main thread
+        journalJoinLatch.countDown();
+        b.shutdown();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 6ae9410b3..b6282443f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -332,7 +332,7 @@ public void testIndexPageEvictionWriteOrder() throws Exception {
         b.start();
         for (int i = 1; i <= numLedgers; i++) {
             ByteBuf packet = generateEntry(i, 1);
-            b.addEntry(packet, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+            b.addEntry(packet, false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
         }
 
         conf = TestBKConfiguration.newServerConfiguration();
@@ -539,7 +539,8 @@ public void testEntryMemTableFlushFailure() throws Exception {
         // this bookie.addEntry call is required. FileInfo for Ledger 1 would be created with this call.
         // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls will fail
         // because of BOOKKEEPER-965 change.
-        bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(1, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         assertFalse("Bookie is expected to be in ReadWrite mode", bookie.isReadOnly());
         assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
@@ -583,7 +584,7 @@ public void testSortedLedgerFlushFailure() throws Exception {
         FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage) bookie.ledgerStorage;
         EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
 
-        bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(1, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         assertFalse("Bookie is expected to be in ReadWrite mode", bookie.isReadOnly());
         assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
@@ -598,7 +599,7 @@ public void testSortedLedgerFlushFailure() throws Exception {
         // after flush failure, the bookie is set to readOnly
         assertTrue("Bookie is expected to be in Read mode", bookie.isReadOnly());
         // write fail
-        bookie.addEntry(generateEntry(1, 3), new BookkeeperInternalCallbacks.WriteCallback(){
+        bookie.addEntry(generateEntry(1, 3), false, new BookkeeperInternalCallbacks.WriteCallback(){
             public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx){
                 LOG.info("fail write to bk");
                 assertTrue(rc != OK);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 69f8f6c03..850fe5d7e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -89,7 +89,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb,
                 }
 
                 @Override
-                public void addEntry(ByteBuf entry, WriteCallback cb,
+                public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb,
                                      Object ctx, byte[] masterKey)
                         throws IOException, BookieException {
                     try {
@@ -99,7 +99,7 @@ public void addEntry(ByteBuf entry, WriteCallback cb,
                         // and an exception would spam the logs
                         Thread.currentThread().interrupt();
                     }
-                    super.addEntry(entry, cb, ctx, masterKey);
+                    super.addEntry(entry, ackBeforeSync, cb, ctx, masterKey);
                 }
 
                 @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index ad9c352b7..900a7c2a7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -197,7 +197,7 @@ private void startUnauthorizedBookie(ServerConfiguration conf, final CountDownLa
             throws Exception {
         Bookie sBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();
@@ -221,7 +221,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[]
     private void startDeadBookie(ServerConfiguration conf, final CountDownLatch latch) throws Exception {
         Bookie dBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index cf205c117..109147531 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -193,7 +193,7 @@ private void ledgerRecoveryWithSlowBookie(int ensembleSize, int writeQuorumSize,
 
         Bookie fakeBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 // drop request to simulate a slow and failed bookie
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 085a248ea..8e879218c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -455,9 +455,9 @@ public DelayResponseBookie(ServerConfiguration conf)
         }
 
         @Override
-        public void addEntry(ByteBuf entry, final WriteCallback cb, Object ctx, byte[] masterKey)
-                throws IOException, BookieException {
-            super.addEntry(entry, new WriteCallback() {
+        public void addEntry(ByteBuf entry, boolean ackBeforeSync, final WriteCallback cb,
+                             Object ctx, byte[] masterKey) throws IOException, BookieException {
+            super.addEntry(entry, ackBeforeSync, new WriteCallback() {
                 @Override
                 public void writeComplete(int rc, long ledgerId, long entryId,
                                           BookieSocketAddress addr, Object ctx) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index a3b0b0da0..35eb9585f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -376,7 +376,7 @@ private BookieSocketAddress replaceBookieWithWriteFailingBookie(LedgerHandle lh)
         ServerConfiguration conf = killBookie(bookieIdx);
         Bookie writeFailingBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb,
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb,
                              Object ctx, byte[] masterKey)
                              throws IOException, BookieException {
                 try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
index 6e551390a..b11d33605 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
@@ -187,7 +187,7 @@ public void writeComplete(int rc, long ledgerId, long entryId,
                 bytes.position(0);
                 bytes.limit(bytes.capacity());
                 throttle.acquire();
-                bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter, zeros);
+                bookie.addEntry(Unpooled.wrappedBuffer(bytes), false, cb, counter, zeros);
             }
         }
         long finish = System.currentTimeMillis();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services