You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2019/12/07 06:39:55 UTC

[incubator-ratis] branch master updated: RATIS-767. DirectByteBuffers leaked by BufferedWriteChannel in SegmentRaftLog. Contributed by Mukul Kumar Singh.

This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 284db7f  RATIS-767. DirectByteBuffers leaked by BufferedWriteChannel in SegmentRaftLog. Contributed by Mukul Kumar Singh.
284db7f is described below

commit 284db7f50f59bbbac11c11f6f2ed847fedb59472
Author: Mukul Kumar Singh <mu...@cloudera.com>
AuthorDate: Sat Dec 7 12:09:33 2019 +0530

    RATIS-767. DirectByteBuffers leaked by BufferedWriteChannel in SegmentRaftLog. Contributed by Mukul Kumar Singh.
---
 .../raftlog/segmented/BufferedWriteChannel.java    |  4 +--
 .../segmented/SegmentedRaftLogOutputStream.java    |  4 +--
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 30 +++++++++++++++-------
 .../statemachine/SimpleStateMachine4Testing.java   |  3 ++-
 .../server/raftlog/segmented/TestLogSegment.java   |  9 ++++---
 .../raftlog/segmented/TestRaftLogReadWrite.java    | 12 ++++-----
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  3 ++-
 7 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index 86b45b5..96c283d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -34,11 +34,11 @@ public class BufferedWriteChannel extends BufferedChannelBase {
   /** Are all the data already flushed? */
   private boolean flushed = true;
 
-  public BufferedWriteChannel(FileChannel fc, int writeCapacity)
+  public BufferedWriteChannel(FileChannel fc, ByteBuffer byteBuffer)
       throws IOException {
     super(fc);
     this.position = fc.position();
-    this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
+    this.writeBuffer = byteBuffer;
   }
 
   /**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index d042f33..8ecdfc9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -57,7 +57,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
   private long preallocatedPos;
 
   public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
-      long preallocatedSize, int bufferSize)
+      long preallocatedSize, ByteBuffer byteBuffer)
       throws IOException {
     this.file = file;
     this.checksum = new PureJavaCrc32C();
@@ -69,7 +69,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
       fc.position(fc.size());
       preallocatedPos = fc.size();
 
-      out = new BufferedWriteChannel(fc, bufferSize);
+      out = new BufferedWriteChannel(fc, byteBuffer);
       if (!append) {
         create();
       }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 1845912..d944c84 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Objects;
 import java.util.Optional;
@@ -146,6 +147,7 @@ class SegmentedRaftLogWorker implements Runnable {
   private final Timer raftLogQueueingTimer;
   private final Timer raftLogEnqueueingDelayTimer;
   private final RaftLogMetrics raftLogMetrics;
+  private final ByteBuffer writeBuffer;
 
   /**
    * The number of entries that have been written into the SegmentedRaftLogOutputStream but
@@ -199,6 +201,8 @@ class SegmentedRaftLogWorker implements Runnable {
     this.raftLogSyncTimer = metricRegistry.getRaftLogSyncTimer();
     this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
     this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
+
+    this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
   }
 
   void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -207,8 +211,7 @@ class SegmentedRaftLogWorker implements Runnable {
     flushIndex.setUnconditionally(latestIndex, infoIndexChange);
     if (openSegmentFile != null) {
       Preconditions.assertTrue(openSegmentFile.exists());
-      out = new SegmentedRaftLogOutputStream(openSegmentFile, true, segmentMaxSize,
-          preallocatedSize, bufferSize);
+      allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
     }
     workerThread.start();
   }
@@ -520,8 +523,7 @@ class SegmentedRaftLogWorker implements Runnable {
 
     @Override
     public void execute() throws IOException {
-      IOUtils.cleanup(LOG, out);
-      out = null;
+      freeSegmentedRaftLogOutputStream();
 
       File openFile = storage.getStorageDir().getOpenLogFile(startIndex);
       Preconditions.assertTrue(openFile.exists(),
@@ -570,9 +572,8 @@ class SegmentedRaftLogWorker implements Runnable {
       File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
       Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s",
           openFile, name);
-      Preconditions.assertTrue(out == null && pendingFlushNum == 0);
-      out = new SegmentedRaftLogOutputStream(openFile, false, segmentMaxSize,
-          preallocatedSize, bufferSize);
+      Preconditions.assertTrue(pendingFlushNum == 0);
+      allocateSegmentedRaftLogOutputStream(openFile, false);
       Preconditions.assertTrue(openFile.exists(), "Failed to create file %s for %s",
           openFile.getAbsolutePath(), name);
       LOG.info("{}: created new log segment {}", name, openFile);
@@ -596,8 +597,7 @@ class SegmentedRaftLogWorker implements Runnable {
 
     @Override
     void execute() throws IOException {
-      IOUtils.cleanup(null, out);
-      out = null;
+      freeSegmentedRaftLogOutputStream();
       CompletableFuture<Void> stateMachineFuture = null;
       if (stateMachine != null) {
         stateMachineFuture = stateMachine.truncateStateMachineData(truncateIndex);
@@ -672,4 +672,16 @@ class SegmentedRaftLogWorker implements Runnable {
   long getFlushIndex() {
     return flushIndex.get();
   }
+
+  private void freeSegmentedRaftLogOutputStream() {
+    IOUtils.cleanup(LOG, out);
+    out = null;
+    Preconditions.assertTrue(writeBuffer.position() == 0);
+  }
+
+  private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
+    Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
+    out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
+            preallocatedSize, writeBuffer);
+  }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 5fb0312..32ab38a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
@@ -255,7 +256,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(),
         termIndex.getIndex(), snapshotFile);
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false,
-        segmentMaxSize, preallocatedSize, bufferSize)) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
       for (final LogEntryProto entry : indexMap.values()) {
         if (entry.getIndex() > endIndex) {
           break;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 0e9eb9f..032c758 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
@@ -95,7 +96,7 @@ public class TestLogSegment extends BaseTest {
 
     final LogEntryProto[] entries = new LogEntryProto[numEntries];
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-        segmentMaxSize, preallocatedSize, bufferSize)) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 0; i < entries.length; i++) {
         SimpleOperation op = new SimpleOperation("m" + i);
         entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + startIndex);
@@ -295,7 +296,7 @@ public class TestLogSegment extends BaseTest {
     // make sure preallocation is correct with different max/pre-allocated size
     for (int max : maxSizes) {
       for (int a : preallocated) {
-        try(SegmentedRaftLogOutputStream ignored = new SegmentedRaftLogOutputStream(file, false, max, a, bufferSize)) {
+        try(SegmentedRaftLogOutputStream ignored = new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) {
           Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a));
         }
         try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
@@ -310,7 +311,7 @@ public class TestLogSegment extends BaseTest {
     Arrays.fill(content, (byte) 1);
     final long size;
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-        1024, 1024, bufferSize)) {
+        1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
       SimpleOperation op = new SimpleOperation(new String(content));
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
       size = LogSegment.getEntrySize(entry);
@@ -345,7 +346,7 @@ public class TestLogSegment extends BaseTest {
     long totalSize = SegmentedRaftLogFormat.getHeaderLength();
     long preallocated = 16 * 1024;
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-        max.getSize(), 16 * 1024, 10 * 1024)) {
+        max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024))) {
       Assert.assertEquals(preallocated, file.length());
       while (totalSize + entrySize < max.getSize()) {
         totalSize += entrySize;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 7edf1c0..c4b64c0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -108,7 +108,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     final LogEntryProto[] entries = new LogEntryProto[100];
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, bufferSize)) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
       size += writeMessages(entries, out);
     } finally {
       storage.close();
@@ -127,7 +127,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     File openSegment = storage.getStorageDir().getOpenLogFile(0);
     LogEntryProto[] entries = new LogEntryProto[200];
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, bufferSize)) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 0; i < 100; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -136,7 +136,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     }
 
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, true,
-        segmentMaxSize, preallocatedSize, bufferSize)) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 100; i < 200; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -163,7 +163,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     LogEntryProto[] entries = new LogEntryProto[100];
     final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, bufferSize);
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize));
     size += writeMessages(entries, out);
     out.flush();
 
@@ -192,7 +192,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     LogEntryProto[] entries = new LogEntryProto[10];
     final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize);
+        16 * 1024 * 1024, 4 * 1024 * 1024, ByteBuffer.allocateDirect(bufferSize));
     for (int i = 0; i < 10; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -239,7 +239,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
     File openSegment = storage.getStorageDir().getOpenLogFile(0);
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, bufferSize)) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
       for (int i = 0; i < 100; i++) {
         LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
             new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 9bc27ff..8b686fb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -142,7 +143,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       final int size = (int) (range.end - range.start + 1);
       LogEntryProto[] entries = new LogEntryProto[size];
       try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-          segmentMaxSize, preallocatedSize, bufferSize)) {
+          segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
         for (int i = 0; i < size; i++) {
           SimpleOperation m = new SimpleOperation("m" + (i + range.start));
           entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start);