You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2019/12/07 06:39:55 UTC
[incubator-ratis] branch master updated: RATIS-767.
DirectByteBuffers leaked by BufferedWriteChannel in SegmentRaftLog.
Contributed by Mukul Kumar Singh.
This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 284db7f RATIS-767. DirectByteBuffers leaked by BufferedWriteChannel in SegmentRaftLog. Contributed by Mukul Kumar Singh.
284db7f is described below
commit 284db7f50f59bbbac11c11f6f2ed847fedb59472
Author: Mukul Kumar Singh <mu...@cloudera.com>
AuthorDate: Sat Dec 7 12:09:33 2019 +0530
RATIS-767. DirectByteBuffers leaked by BufferedWriteChannel in SegmentRaftLog. Contributed by Mukul Kumar Singh.
---
.../raftlog/segmented/BufferedWriteChannel.java | 4 +--
.../segmented/SegmentedRaftLogOutputStream.java | 4 +--
.../raftlog/segmented/SegmentedRaftLogWorker.java | 30 +++++++++++++++-------
.../statemachine/SimpleStateMachine4Testing.java | 3 ++-
.../server/raftlog/segmented/TestLogSegment.java | 9 ++++---
.../raftlog/segmented/TestRaftLogReadWrite.java | 12 ++++-----
.../raftlog/segmented/TestSegmentedRaftLog.java | 3 ++-
7 files changed, 40 insertions(+), 25 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index 86b45b5..96c283d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -34,11 +34,11 @@ public class BufferedWriteChannel extends BufferedChannelBase {
/** Are all the data already flushed? */
private boolean flushed = true;
- public BufferedWriteChannel(FileChannel fc, int writeCapacity)
+ public BufferedWriteChannel(FileChannel fc, ByteBuffer byteBuffer)
throws IOException {
super(fc);
this.position = fc.position();
- this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
+ this.writeBuffer = byteBuffer;
}
/**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index d042f33..8ecdfc9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -57,7 +57,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
private long preallocatedPos;
public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
- long preallocatedSize, int bufferSize)
+ long preallocatedSize, ByteBuffer byteBuffer)
throws IOException {
this.file = file;
this.checksum = new PureJavaCrc32C();
@@ -69,7 +69,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
fc.position(fc.size());
preallocatedPos = fc.size();
- out = new BufferedWriteChannel(fc, bufferSize);
+ out = new BufferedWriteChannel(fc, byteBuffer);
if (!append) {
create();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 1845912..d944c84 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
@@ -146,6 +147,7 @@ class SegmentedRaftLogWorker implements Runnable {
private final Timer raftLogQueueingTimer;
private final Timer raftLogEnqueueingDelayTimer;
private final RaftLogMetrics raftLogMetrics;
+ private final ByteBuffer writeBuffer;
/**
* The number of entries that have been written into the SegmentedRaftLogOutputStream but
@@ -199,6 +201,8 @@ class SegmentedRaftLogWorker implements Runnable {
this.raftLogSyncTimer = metricRegistry.getRaftLogSyncTimer();
this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
+
+ this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
}
void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -207,8 +211,7 @@ class SegmentedRaftLogWorker implements Runnable {
flushIndex.setUnconditionally(latestIndex, infoIndexChange);
if (openSegmentFile != null) {
Preconditions.assertTrue(openSegmentFile.exists());
- out = new SegmentedRaftLogOutputStream(openSegmentFile, true, segmentMaxSize,
- preallocatedSize, bufferSize);
+ allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
}
workerThread.start();
}
@@ -520,8 +523,7 @@ class SegmentedRaftLogWorker implements Runnable {
@Override
public void execute() throws IOException {
- IOUtils.cleanup(LOG, out);
- out = null;
+ freeSegmentedRaftLogOutputStream();
File openFile = storage.getStorageDir().getOpenLogFile(startIndex);
Preconditions.assertTrue(openFile.exists(),
@@ -570,9 +572,8 @@ class SegmentedRaftLogWorker implements Runnable {
File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s",
openFile, name);
- Preconditions.assertTrue(out == null && pendingFlushNum == 0);
- out = new SegmentedRaftLogOutputStream(openFile, false, segmentMaxSize,
- preallocatedSize, bufferSize);
+ Preconditions.assertTrue(pendingFlushNum == 0);
+ allocateSegmentedRaftLogOutputStream(openFile, false);
Preconditions.assertTrue(openFile.exists(), "Failed to create file %s for %s",
openFile.getAbsolutePath(), name);
LOG.info("{}: created new log segment {}", name, openFile);
@@ -596,8 +597,7 @@ class SegmentedRaftLogWorker implements Runnable {
@Override
void execute() throws IOException {
- IOUtils.cleanup(null, out);
- out = null;
+ freeSegmentedRaftLogOutputStream();
CompletableFuture<Void> stateMachineFuture = null;
if (stateMachine != null) {
stateMachineFuture = stateMachine.truncateStateMachineData(truncateIndex);
@@ -672,4 +672,16 @@ class SegmentedRaftLogWorker implements Runnable {
long getFlushIndex() {
return flushIndex.get();
}
+
+ private void freeSegmentedRaftLogOutputStream() {
+ IOUtils.cleanup(LOG, out);
+ out = null;
+ Preconditions.assertTrue(writeBuffer.position() == 0);
+ }
+
+ private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
+ Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
+ out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
+ preallocatedSize, writeBuffer);
+ }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 5fb0312..32ab38a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
@@ -255,7 +256,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(),
termIndex.getIndex(), snapshotFile);
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false,
- segmentMaxSize, preallocatedSize, bufferSize)) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (final LogEntryProto entry : indexMap.values()) {
if (entry.getIndex() > endIndex) {
break;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 0e9eb9f..032c758 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
@@ -95,7 +96,7 @@ public class TestLogSegment extends BaseTest {
final LogEntryProto[] entries = new LogEntryProto[numEntries];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- segmentMaxSize, preallocatedSize, bufferSize)) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < entries.length; i++) {
SimpleOperation op = new SimpleOperation("m" + i);
entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + startIndex);
@@ -295,7 +296,7 @@ public class TestLogSegment extends BaseTest {
// make sure preallocation is correct with different max/pre-allocated size
for (int max : maxSizes) {
for (int a : preallocated) {
- try(SegmentedRaftLogOutputStream ignored = new SegmentedRaftLogOutputStream(file, false, max, a, bufferSize)) {
+ try(SegmentedRaftLogOutputStream ignored = new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) {
Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a));
}
try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
@@ -310,7 +311,7 @@ public class TestLogSegment extends BaseTest {
Arrays.fill(content, (byte) 1);
final long size;
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- 1024, 1024, bufferSize)) {
+ 1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
size = LogSegment.getEntrySize(entry);
@@ -345,7 +346,7 @@ public class TestLogSegment extends BaseTest {
long totalSize = SegmentedRaftLogFormat.getHeaderLength();
long preallocated = 16 * 1024;
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- max.getSize(), 16 * 1024, 10 * 1024)) {
+ max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024))) {
Assert.assertEquals(preallocated, file.length());
while (totalSize + entrySize < max.getSize()) {
totalSize += entrySize;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 7edf1c0..c4b64c0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -108,7 +108,7 @@ public class TestRaftLogReadWrite extends BaseTest {
final LogEntryProto[] entries = new LogEntryProto[100];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, bufferSize)) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
size += writeMessages(entries, out);
} finally {
storage.close();
@@ -127,7 +127,7 @@ public class TestRaftLogReadWrite extends BaseTest {
File openSegment = storage.getStorageDir().getOpenLogFile(0);
LogEntryProto[] entries = new LogEntryProto[200];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, bufferSize)) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < 100; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -136,7 +136,7 @@ public class TestRaftLogReadWrite extends BaseTest {
}
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, true,
- segmentMaxSize, preallocatedSize, bufferSize)) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 100; i < 200; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -163,7 +163,7 @@ public class TestRaftLogReadWrite extends BaseTest {
LogEntryProto[] entries = new LogEntryProto[100];
final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, bufferSize);
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize));
size += writeMessages(entries, out);
out.flush();
@@ -192,7 +192,7 @@ public class TestRaftLogReadWrite extends BaseTest {
LogEntryProto[] entries = new LogEntryProto[10];
final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- 16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize);
+ 16 * 1024 * 1024, 4 * 1024 * 1024, ByteBuffer.allocateDirect(bufferSize));
for (int i = 0; i < 10; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -239,7 +239,7 @@ public class TestRaftLogReadWrite extends BaseTest {
RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
File openSegment = storage.getStorageDir().getOpenLogFile(0);
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, bufferSize)) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 9bc27ff..8b686fb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -52,6 +52,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -142,7 +143,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final int size = (int) (range.end - range.start + 1);
LogEntryProto[] entries = new LogEntryProto[size];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- segmentMaxSize, preallocatedSize, bufferSize)) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < size; i++) {
SimpleOperation m = new SimpleOperation("m" + (i + range.start));
entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start);