You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/03/09 11:03:18 UTC
[bookkeeper] branch master updated: Issue #570: Move logic of
unpersistedbytes to bufferedchannel
This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 30261ea Issue #570: Move logic of unpersistedbytes to bufferedchannel
30261ea is described below
commit 30261eae3fd8ab25239d57cfb86a200d5f7b6b7d
Author: Charan Reddy Guttapalem <cg...@salesforce.com>
AuthorDate: Fri Mar 9 12:03:12 2018 +0100
Issue #570: Move logic of unpersistedbytes to bufferedchannel
Descriptions of the changes in this PR:
This is < sub-task2 > of Issue #570.
https://github.com/apache/bookkeeper/commit/26b09abb4202362ca37d6944ce75eb2a3309dc3c
introduced the flushEntrylogBytes factor. But it is structurally correct to have this
logic in BufferedChannel, rather than in EntryLogger, since it is paramter of BufferedChannel.
Master Issue: #570
Author: cguttapalem <cg...@salesforce.com>
Reviewers: Ivan Kelly <iv...@apache.org>, Sijie Guo <si...@apache.org>
This closes #1228 from reddycharan/fixunpersistedbytes, closes #570
---
.../apache/bookkeeper/bookie/BufferedChannel.java | 130 +++++++++++++++-----
.../org/apache/bookkeeper/bookie/EntryLogger.java | 40 ++----
.../java/org/apache/bookkeeper/bookie/Journal.java | 2 +-
.../bookkeeper/bookie/BookieJournalTest.java | 12 +-
.../bookkeeper/bookie/BufferedChannelTest.java | 134 +++++++++++++++++++++
.../org/apache/bookkeeper/bookie/UpgradeTest.java | 2 +-
6 files changed, 251 insertions(+), 69 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 0d21d41..05a20e5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -41,20 +41,46 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
// The buffer used to write operations.
protected final ByteBuf writeBuffer;
// The absolute position of the next write operation.
- protected volatile long position;
+ protected final AtomicLong position;
+
+ /*
+ * if unpersistedBytesBound is non-zero value, then after writing to
+ * writeBuffer, it will check if the unpersistedBytes is greater than
+ * unpersistedBytesBound and then calls flush method if it is greater.
+ *
+ * It is a best-effort feature, since 'forceWrite' method is not
+ * synchronized and unpersistedBytes is reset in 'forceWrite' method before
+ * calling fileChannel.force
+ */
+ protected final long unpersistedBytesBound;
+
+ /*
+ * it tracks the number of bytes which are not persisted yet by force
+ * writing the FileChannel. The unpersisted bytes could be in writeBuffer or
+ * in fileChannel system cache.
+ */
+ protected final AtomicLong unpersistedBytes;
// make constructor to be public for unit test
public BufferedChannel(FileChannel fc, int capacity) throws IOException {
// Use the same capacity for read and write buffers.
- this(fc, capacity, capacity);
+ this(fc, capacity, 0L);
+ }
+
+ public BufferedChannel(FileChannel fc, int capacity, long unpersistedBytesBound) throws IOException {
+ // Use the same capacity for read and write buffers.
+ this(fc, capacity, capacity, unpersistedBytesBound);
}
- public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+ public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long unpersistedBytesBound)
+ throws IOException {
super(fc, readCapacity);
this.writeCapacity = writeCapacity;
- this.position = fc.position();
- this.writeBufferStartPosition.set(position);
+ this.position = new AtomicLong(fc.position());
+ this.writeBufferStartPosition.set(position.get());
this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+ this.unpersistedBytes = new AtomicLong(0);
+ this.unpersistedBytesBound = unpersistedBytesBound;
}
@Override
@@ -70,20 +96,34 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
* @param src The source ByteBuffer which contains the data to be written.
* @throws IOException if a write operation fails.
*/
- public synchronized void write(ByteBuf src) throws IOException {
+ public void write(ByteBuf src) throws IOException {
int copied = 0;
- int len = src.readableBytes();
- while (copied < len) {
- int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
- writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
- copied += bytesToCopy;
-
- // if we have run out of buffer space, we should flush to the file
- if (!writeBuffer.isWritable()) {
- flushInternal();
+ boolean shouldForceWrite = false;
+ synchronized (this) {
+ int len = src.readableBytes();
+ while (copied < len) {
+ int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
+ writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
+ copied += bytesToCopy;
+
+ // if we have run out of buffer space, we should flush to the
+ // file
+ if (!writeBuffer.isWritable()) {
+ flush();
+ }
+ }
+ position.addAndGet(copied);
+ unpersistedBytes.addAndGet(copied);
+ if (unpersistedBytesBound > 0) {
+ if (unpersistedBytes.get() >= unpersistedBytesBound) {
+ flush();
+ shouldForceWrite = true;
+ }
}
}
- position += copied;
+ if (shouldForceWrite) {
+ forceWrite(false);
+ }
}
/**
@@ -91,7 +131,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
* @return
*/
public long position() {
- return position;
+ return position.get();
}
/**
@@ -102,28 +142,27 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
return writeBufferStartPosition.get();
}
-
/**
- * Write any data in the buffer to the file. If sync is set to true, force a sync operation so that
- * data is persisted to the disk.
- * @param shouldForceWrite
- * @throws IOException if the write or sync operation fails.
+ * calls both flush and forceWrite methods.
+ *
+ * @param forceMetadata
+ * - If true then this method is required to force changes to
+ * both the file's content and metadata to be written to storage;
+ * otherwise, it need only force content changes to be written
+ * @throws IOException
*/
- public void flush(boolean shouldForceWrite) throws IOException {
- synchronized (this) {
- flushInternal();
- }
- if (shouldForceWrite) {
- forceWrite(false);
- }
+ public void flushAndForceWrite(boolean forceMetadata) throws IOException {
+ flush();
+ forceWrite(forceMetadata);
}
/**
* Write any data in the buffer to the file and advance the writeBufferPosition.
* Callers are expected to synchronize appropriately
+ *
* @throws IOException if the write fails.
*/
- private void flushInternal() throws IOException {
+ public synchronized void flush() throws IOException {
ByteBuffer toWrite = writeBuffer.internalNioBuffer(0, writeBuffer.writerIndex());
do {
fileChannel.write(toWrite);
@@ -132,12 +171,33 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
writeBufferStartPosition.set(fileChannel.position());
}
+ /*
+ * force a sync operation so that data is persisted to the disk.
+ */
public long forceWrite(boolean forceMetadata) throws IOException {
// This is the point up to which we had flushed to the file system page cache
// before issuing this force write hence is guaranteed to be made durable by
// the force write, any flush that happens after this may or may
// not be flushed
long positionForceWrite = writeBufferStartPosition.get();
+ /*
+ * since forceWrite method is not called in synchronized block, to make
+ * sure we are not undercounting unpersistedBytes, setting
+ * unpersistedBytes to the current number of bytes in writeBuffer.
+ *
+ * since we are calling fileChannel.force, bytes which are written to
+ * filechannel (system filecache) will be persisted to the disk. So we
+ * dont need to consider those bytes for setting value to
+ * unpersistedBytes.
+ *
+ * In this method fileChannel.force is not called in synchronized block, so
+ * we are doing best efforts to not overcount or undercount unpersistedBytes.
+ * Hence setting writeBuffer.readableBytes() to unpersistedBytes.
+ *
+ */
+ synchronized (this) {
+ unpersistedBytes.set(writeBuffer.readableBytes());
+ }
fileChannel.force(forceMetadata);
return positionForceWrite;
}
@@ -188,4 +248,12 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
super.clear();
writeBuffer.clear();
}
-}
+
+ public synchronized int getNumOfBytesInWriteBuffer() {
+ return writeBuffer.readableBytes();
+ }
+
+ long getUnpersistedBytes() {
+ return unpersistedBytes.get();
+ }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index df72899..1792417 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -92,8 +92,9 @@ public class EntryLogger {
int writeCapacity,
int readCapacity,
long logId,
- File logFile) throws IOException {
- super(fc, writeCapacity, readCapacity);
+ File logFile,
+ long unpersistedBytesBound) throws IOException {
+ super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
this.logId = logId;
this.entryLogMetadata = new EntryLogMetadata(logId);
this.logFile = logFile;
@@ -197,9 +198,6 @@ public class EntryLogger {
static final int MIN_SANE_ENTRY_SIZE = 8 + 8;
static final long MB = 1024 * 1024;
- private final long flushIntervalInBytes;
- private final boolean doRegularFlushes;
- private long bytesWrittenSinceLastFlush = 0;
private final int maxSaneEntrySize;
final ServerConfiguration conf;
@@ -287,8 +285,6 @@ public class EntryLogger {
this.leastUnflushedLogId = logId + 1;
this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
this.conf = conf;
- flushIntervalInBytes = conf.getFlushIntervalInBytes();
- doRegularFlushes = flushIntervalInBytes > 0;
initialize();
}
@@ -481,7 +477,7 @@ public class EntryLogger {
// flush the internal buffer back to filesystem but not sync disk
// so the readers could access the data from filesystem.
- logChannel.flush(false);
+ logChannel.flush();
// Append ledgers map at the end of entry log
appendLedgersMap(logChannel);
@@ -570,7 +566,7 @@ public class EntryLogger {
}
// Flush the ledger's map out before we write the header.
// Otherwise the header might point to something that is not fully written
- entryLogChannel.flush(false);
+ entryLogChannel.flush();
// Update the headers with the map offset and count of ledgers
ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
@@ -668,8 +664,8 @@ public class EntryLogger {
} while (newLogFile == null);
FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
- BufferedLogChannel logChannel = new BufferedLogChannel(channel,
- conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId, newLogFile);
+ BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+ conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
logfileHeader.readerIndex(0);
logChannel.write(logfileHeader);
@@ -817,7 +813,7 @@ public class EntryLogger {
while (chIter.hasNext()) {
BufferedLogChannel channel = chIter.next();
try {
- channel.flush(true);
+ channel.flushAndForceWrite(false);
} catch (IOException ioe) {
// rescue from flush exception, add unflushed channels back
synchronized (this) {
@@ -851,8 +847,7 @@ public class EntryLogger {
synchronized void flushCurrentLog() throws IOException {
if (logChannel != null) {
- logChannel.flush(true);
- bytesWrittenSinceLastFlush = 0;
+ logChannel.flushAndForceWrite(false);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
}
@@ -881,9 +876,6 @@ public class EntryLogger {
// Create new log if logSizeLimit reached or current disk is full
boolean createNewLog = shouldCreateNewEntryLog.get();
if (createNewLog || reachEntryLogLimit) {
- if (doRegularFlushes) {
- flushCurrentLog();
- }
createNewLog();
// Reset the flag
if (createNewLog) {
@@ -901,8 +893,6 @@ public class EntryLogger {
logChannel.write(entry);
logChannel.registerWrittenEntry(ledger, entrySize);
- incrementBytesWrittenAndMaybeFlush(4L + entrySize);
-
return (logChannel.getLogId() << 32L) | pos;
}
@@ -928,7 +918,7 @@ public class EntryLogger {
void flushCompactionLog() throws IOException {
synchronized (compactionLogLock) {
if (compactionLogChannel != null) {
- compactionLogChannel.flush(true);
+ compactionLogChannel.flushAndForceWrite(false);
LOG.info("Flushed compaction log file {} with logId.",
compactionLogChannel.getLogFile(),
compactionLogChannel.getLogId());
@@ -970,16 +960,6 @@ public class EntryLogger {
}
}
- private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
- if (!doRegularFlushes) {
- return;
- }
- bytesWrittenSinceLastFlush += bytesWritten;
- if (bytesWrittenSinceLastFlush > flushIntervalInBytes) {
- flushCurrentLog();
- }
- }
-
static long logIdForOffset(long offset) {
return offset >> 32L;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 146a83e..775219b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -986,7 +986,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
}
journalFlushWatcher.reset().start();
- bc.flush(false);
+ bc.flush();
for (int i = 0; i < toFlush.size(); i++) {
QueueEntry entry = toFlush.get(i);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 9294e27..5133e78 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -171,7 +171,7 @@ public class BookieJournalTest {
private static void moveToPosition(JournalChannel jc, long pos) throws IOException {
jc.fc.position(pos);
- jc.bc.position = pos;
+ jc.bc.position.set(pos);
jc.bc.writeBufferStartPosition.set(pos);
}
@@ -211,7 +211,7 @@ public class BookieJournalTest {
bc.write(packet);
packet.release();
}
- bc.flush(true);
+ bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V2);
@@ -245,7 +245,7 @@ public class BookieJournalTest {
bc.write(packet);
packet.release();
}
- bc.flush(true);
+ bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V3);
@@ -284,7 +284,7 @@ public class BookieJournalTest {
lenBuf.writeInt(packet.readableBytes());
bc.write(lenBuf);
bc.write(packet);
- bc.flush(true);
+ bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V4);
return jc;
}
@@ -324,7 +324,7 @@ public class BookieJournalTest {
bc.write(lenBuf);
bc.write(packet);
Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
- bc.flush(true);
+ bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V5);
return jc;
}
@@ -521,7 +521,7 @@ public class BookieJournalTest {
JournalChannel jc = writeV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
jc.getBufferedChannel().write(Unpooled.wrappedBuffer("JunkJunkJunk".getBytes()));
- jc.getBufferedChannel().flush(true);
+ jc.getBufferedChannel().flushAndForceWrite(false);
writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
new file mode 100644
index 0000000..86f3a86
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for BufferedChannel.
+ */
+public class BufferedChannelTest {
+
+ private static Random rand = new Random();
+ private static final int INTERNAL_BUFFER_WRITE_CAPACITY = 65536;
+ private static final int INTERNAL_BUFFER_READ_CAPACITY = 512;
+
+ @Test
+ public void testBufferedChannelWithNoBoundOnUnpersistedBytes() throws Exception {
+ testBufferedChannel(5000, 30, 0, false, false);
+ }
+
+ @Test
+ public void testBufferedChannelWithBoundOnUnpersistedBytes() throws Exception {
+ testBufferedChannel(5000, 30, 5000 * 28, false, false);
+ }
+
+ @Test
+ public void testBufferedChannelWithBoundOnUnpersistedBytesAndFlush() throws Exception {
+ testBufferedChannel(5000, 30, 5000 * 28, true, false);
+ }
+
+ @Test
+ public void testBufferedChannelFlushNoForceWrite() throws Exception {
+ testBufferedChannel(5000, 30, 0, true, false);
+ }
+
+ @Test
+ public void testBufferedChannelForceWriteNoFlush() throws Exception {
+ testBufferedChannel(5000, 30, 0, false, true);
+ }
+
+ @Test
+ public void testBufferedChannelFlushForceWrite() throws Exception {
+ testBufferedChannel(5000, 30, 0, true, true);
+ }
+
+ public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersistedBytesBound, boolean flush,
+ boolean shouldForceWrite) throws Exception {
+ File newLogFile = File.createTempFile("test", "log");
+ newLogFile.deleteOnExit();
+ FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();
+
+ BufferedChannel logChannel = new BufferedChannel(fileChannel, INTERNAL_BUFFER_WRITE_CAPACITY,
+ INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
+
+ ByteBuf dataBuf = generateEntry(byteBufLength);
+ dataBuf.markReaderIndex();
+ dataBuf.markWriterIndex();
+
+ for (int i = 0; i < numOfWrites; i++) {
+ logChannel.write(dataBuf);
+ dataBuf.resetReaderIndex();
+ dataBuf.resetWriterIndex();
+ }
+
+ if (flush && shouldForceWrite) {
+ logChannel.flushAndForceWrite(false);
+ } else if (flush) {
+ logChannel.flush();
+ } else if (shouldForceWrite) {
+ logChannel.forceWrite(false);
+ }
+
+ int expectedNumOfUnpersistedBytes = 0;
+
+ if (flush && shouldForceWrite) {
+ /*
+ * if flush call is made with shouldForceWrite,
+ * then expectedNumOfUnpersistedBytes should be zero.
+ */
+ expectedNumOfUnpersistedBytes = 0;
+ } else if (!flush && shouldForceWrite) {
+ /*
+ * if flush is not called then internal write buffer is not flushed,
+ * but while adding entries to BufferedChannel if writeBuffer has
+ * reached its capacity then it will call flush method, and the data
+ * gets added to the file buffer. So though explicitly we are not
+ * calling flush method, implicitly flush gets called when
+ * writeBuffer reaches its capacity.
+ */
+ expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) % INTERNAL_BUFFER_WRITE_CAPACITY;
+ } else {
+ expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) - unpersistedBytesBound;
+ }
+
+ Assert.assertEquals("Unpersisted bytes", expectedNumOfUnpersistedBytes, logChannel.getUnpersistedBytes());
+ logChannel.close();
+ fileChannel.close();
+ }
+
+ private static ByteBuf generateEntry(int length) {
+ byte[] data = new byte[length];
+ ByteBuf bb = Unpooled.buffer(length);
+ rand.nextBytes(data);
+ bb.writeBytes(data);
+ return bb;
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index 77f2453..b89a5b0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -105,7 +105,7 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
bc.write(packet);
packet.release();
}
- bc.flush(true);
+ bc.flushAndForceWrite(false);
return jc;
}
--
To stop receiving notification emails like this one, please contact
ivank@apache.org.