You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/10/31 10:19:39 UTC
[bookkeeper] branch branch-4.10 updated: Removed mutex contention
on BufferedChannel
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-4.10
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.10 by this push:
new 290d1cf Removed mutex contention on BufferedChannel
290d1cf is described below
commit 290d1cf6451b534426152bc49faad533d8384600
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 31 03:17:58 2019 -0700
Removed mutex contention on BufferedChannel
### Motivation
Contention on `BufferedChannel` between the journal thread and the ForceWriteThread was introduced in #1228.
`unpersistentBytes` is only used if `unpersistedBytesBound > 0`, which is not true by default. We shouldn't be paying the penalty if not needed.
Also `position` doesn't need to be `AtomicLong` since it's only updated while the mutex is taken. Using a volatile will have the same effect with less overhead.
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <zh...@apache.org>
This closes #2187 from merlimat/buffered-channel-contention
---
.../apache/bookkeeper/bookie/BufferedChannel.java | 19 +++++++++--------
.../bookkeeper/bookie/BookieJournalTest.java | 2 +-
.../bookkeeper/bookie/BufferedChannelTest.java | 4 +++-
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 24 +++++++++++++---------
4 files changed, 29 insertions(+), 20 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 31fb203..a5c1ad7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -42,7 +42,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
// The buffer used to write operations.
protected final ByteBuf writeBuffer;
// The absolute position of the next write operation.
- protected final AtomicLong position;
+ protected volatile long position;
/*
* if unpersistedBytesBound is non-zero value, then after writing to
@@ -81,8 +81,8 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
long unpersistedBytesBound) throws IOException {
super(fc, readCapacity);
this.writeCapacity = writeCapacity;
- this.position = new AtomicLong(fc.position());
- this.writeBufferStartPosition.set(position.get());
+ this.position = fc.position();
+ this.writeBufferStartPosition.set(position);
this.writeBuffer = allocator.directBuffer(writeCapacity);
this.unpersistedBytes = new AtomicLong(0);
this.unpersistedBytesBound = unpersistedBytesBound;
@@ -123,9 +123,9 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
flush();
}
}
- position.addAndGet(copied);
- unpersistedBytes.addAndGet(copied);
+ position += copied;
if (doRegularFlushes) {
+ unpersistedBytes.addAndGet(copied);
if (unpersistedBytes.get() >= unpersistedBytesBound) {
flush();
shouldForceWrite = true;
@@ -142,7 +142,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
* @return
*/
public long position() {
- return position.get();
+ return position;
}
/**
@@ -221,9 +221,12 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
* Hence setting writeBuffer.readableBytes() to unpersistedBytes.
*
*/
- synchronized (this) {
- unpersistedBytes.set(writeBuffer.readableBytes());
+ if (unpersistedBytesBound > 0) {
+ synchronized (this) {
+ unpersistedBytes.set(writeBuffer.readableBytes());
+ }
}
+
fileChannel.force(forceMetadata);
return positionForceWrite;
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index e8d004c..15589e3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -172,7 +172,7 @@ public class BookieJournalTest {
private static void moveToPosition(JournalChannel jc, long pos) throws IOException {
jc.fc.position(pos);
- jc.bc.position.set(pos);
+ jc.bc.position = pos;
jc.bc.writeBufferStartPosition.set(pos);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
index c98663d..f6f19d5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -121,7 +121,9 @@ public class BufferedChannelTest {
expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) - unpersistedBytesBound;
}
- Assert.assertEquals("Unpersisted bytes", expectedNumOfUnpersistedBytes, logChannel.getUnpersistedBytes());
+ if (unpersistedBytesBound > 0) {
+ Assert.assertEquals("Unpersisted bytes", expectedNumOfUnpersistedBytes, logChannel.getUnpersistedBytes());
+ }
logChannel.close();
fileChannel.close();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index e1f3582..ed52982 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -526,16 +526,20 @@ public class EntryLogTest {
Assert.assertFalse("Exception happened in one of the operation", exceptionHappened.get());
- /*
- * if flush of the previous current channel is called then the
- * unpersistedBytes should be less than what it was before, actually it
- * would be close to zero (but when new log is created with addEntry
- * call, ledgers map will be appended at the end of entry log)
- */
- Assert.assertTrue(
- "previous currentChannel unpersistedBytes should be less than " + currentActiveChannelUnpersistedBytes
- + ", but it is actually " + currentActiveChannel.getUnpersistedBytes(),
- currentActiveChannel.getUnpersistedBytes() < currentActiveChannelUnpersistedBytes);
+ if (conf.getFlushIntervalInBytes() > 0) {
+ /*
+ * if flush of the previous current channel is called then the
+ * unpersistedBytes should be less than what it was before, actually
+ * it would be close to zero (but when new log is created with
+ * addEntry call, ledgers map will be appended at the end of entry
+ * log)
+ */
+ Assert.assertTrue(
+ "previous currentChannel unpersistedBytes should be less than "
+ + currentActiveChannelUnpersistedBytes
+ + ", but it is actually " + currentActiveChannel.getUnpersistedBytes(),
+ currentActiveChannel.getUnpersistedBytes() < currentActiveChannelUnpersistedBytes);
+ }
for (BufferedLogChannel rotatedLogChannel : rotatedLogChannels) {
Assert.assertEquals("previous rotated entrylog should be flushandforcewritten", 0,
rotatedLogChannel.getUnpersistedBytes());