You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/10/31 10:19:39 UTC

[bookkeeper] branch branch-4.10 updated: Removed mutex contention on BufferedChannel

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-4.10
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.10 by this push:
     new 290d1cf  Removed mutex contention on BufferedChannel
290d1cf is described below

commit 290d1cf6451b534426152bc49faad533d8384600
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 31 03:17:58 2019 -0700

    Removed mutex contention on BufferedChannel
    
    ### Motivation
    
    Contention on `BufferedChannel` between the journal thread and the ForceWriteThread was introduced in #1228.
    
    `unpersistentBytes` is only used if `unpersistedBytesBound > 0`, which is not true by default. We shouldn't be paying the penalty if not needed.
    
    Also `position` doesn't need to be `AtomicLong` since it's only updated while the mutex is taken. Using a volatile will have the same effect with less overhead.
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <zh...@apache.org>
    
    This closes #2187 from merlimat/buffered-channel-contention
---
 .../apache/bookkeeper/bookie/BufferedChannel.java  | 19 +++++++++--------
 .../bookkeeper/bookie/BookieJournalTest.java       |  2 +-
 .../bookkeeper/bookie/BufferedChannelTest.java     |  4 +++-
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 24 +++++++++++++---------
 4 files changed, 29 insertions(+), 20 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 31fb203..a5c1ad7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -42,7 +42,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
     // The buffer used to write operations.
     protected final ByteBuf writeBuffer;
     // The absolute position of the next write operation.
-    protected final AtomicLong position;
+    protected volatile long position;
 
     /*
      * if unpersistedBytesBound is non-zero value, then after writing to
@@ -81,8 +81,8 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
             long unpersistedBytesBound) throws IOException {
         super(fc, readCapacity);
         this.writeCapacity = writeCapacity;
-        this.position = new AtomicLong(fc.position());
-        this.writeBufferStartPosition.set(position.get());
+        this.position = fc.position();
+        this.writeBufferStartPosition.set(position);
         this.writeBuffer = allocator.directBuffer(writeCapacity);
         this.unpersistedBytes = new AtomicLong(0);
         this.unpersistedBytesBound = unpersistedBytesBound;
@@ -123,9 +123,9 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
                     flush();
                 }
             }
-            position.addAndGet(copied);
-            unpersistedBytes.addAndGet(copied);
+            position += copied;
             if (doRegularFlushes) {
+                unpersistedBytes.addAndGet(copied);
                 if (unpersistedBytes.get() >= unpersistedBytesBound) {
                     flush();
                     shouldForceWrite = true;
@@ -142,7 +142,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
      * @return
      */
     public long position() {
-        return position.get();
+        return position;
     }
 
     /**
@@ -221,9 +221,12 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
          * Hence setting writeBuffer.readableBytes() to unpersistedBytes.
          *
          */
-        synchronized (this) {
-            unpersistedBytes.set(writeBuffer.readableBytes());
+        if (unpersistedBytesBound > 0) {
+            synchronized (this) {
+                unpersistedBytes.set(writeBuffer.readableBytes());
+            }
         }
+
         fileChannel.force(forceMetadata);
         return positionForceWrite;
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index e8d004c..15589e3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -172,7 +172,7 @@ public class BookieJournalTest {
 
     private static void moveToPosition(JournalChannel jc, long pos) throws IOException {
         jc.fc.position(pos);
-        jc.bc.position.set(pos);
+        jc.bc.position = pos;
         jc.bc.writeBufferStartPosition.set(pos);
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
index c98663d..f6f19d5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -121,7 +121,9 @@ public class BufferedChannelTest {
             expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) - unpersistedBytesBound;
         }
 
-        Assert.assertEquals("Unpersisted bytes", expectedNumOfUnpersistedBytes, logChannel.getUnpersistedBytes());
+        if (unpersistedBytesBound > 0) {
+            Assert.assertEquals("Unpersisted bytes", expectedNumOfUnpersistedBytes, logChannel.getUnpersistedBytes());
+        }
         logChannel.close();
         fileChannel.close();
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index e1f3582..ed52982 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -526,16 +526,20 @@ public class EntryLogTest {
 
         Assert.assertFalse("Exception happened in one of the operation", exceptionHappened.get());
 
-        /*
-         * if flush of the previous current channel is called then the
-         * unpersistedBytes should be less than what it was before, actually it
-         * would be close to zero (but when new log is created with addEntry
-         * call, ledgers map will be appended at the end of entry log)
-         */
-        Assert.assertTrue(
-                "previous currentChannel unpersistedBytes should be less than " + currentActiveChannelUnpersistedBytes
-                        + ", but it is actually " + currentActiveChannel.getUnpersistedBytes(),
-                currentActiveChannel.getUnpersistedBytes() < currentActiveChannelUnpersistedBytes);
+        if (conf.getFlushIntervalInBytes() > 0) {
+            /*
+             * if flush of the previous current channel is called then the
+             * unpersistedBytes should be less than what it was before, actually
+             * it would be close to zero (but when new log is created with
+             * addEntry call, ledgers map will be appended at the end of entry
+             * log)
+             */
+            Assert.assertTrue(
+                    "previous currentChannel unpersistedBytes should be less than "
+                            + currentActiveChannelUnpersistedBytes
+                            + ", but it is actually " + currentActiveChannel.getUnpersistedBytes(),
+                    currentActiveChannel.getUnpersistedBytes() < currentActiveChannelUnpersistedBytes);
+        }
         for (BufferedLogChannel rotatedLogChannel : rotatedLogChannels) {
             Assert.assertEquals("previous rotated entrylog should be flushandforcewritten", 0,
                     rotatedLogChannel.getUnpersistedBytes());