You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/21 18:15:22 UTC

[bookkeeper] branch master updated: Ensure BufferedChannel instance is properly closed

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dd964d6  Ensure BufferedChannel instance is properly closed
dd964d6 is described below

commit dd964d6388987d2431b9fc5afa87d6e2e993feb8
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 21 11:15:14 2018 -0700

    Ensure BufferedChannel instance is properly closed
    
    Whenever the the `EntryLogger` is closing the `BufferedChannel` instance (the channel used for writing) it is closing the file descriptor but it's not calling `close()` on the object itself.
    
    This causes a small mem leak for each log file, because the `writeBuffer` from `BufferedChannel` is never released.
    
    I think this should be considered for a 4.7.1 release.
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>, Charan Reddy Guttapalem <re...@gmail.com>, Matteo Merli <mm...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #1414 from merlimat/fix-entry-log-leak
---
 .../apache/bookkeeper/bookie/BufferedChannel.java  |  6 ++--
 .../bookkeeper/bookie/BufferedChannelBase.java     |  7 ----
 .../EntryLogManagerForEntryLogPerLedger.java       |  9 ++++--
 .../bookie/EntryLogManagerForSingleEntryLog.java   |  9 +++---
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 37 ++++++----------------
 .../java/org/apache/bookkeeper/bookie/Journal.java |  5 +--
 .../apache/bookkeeper/bookie/JournalChannel.java   |  5 ++-
 7 files changed, 29 insertions(+), 49 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 53628cf..b2dd4be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -86,8 +87,9 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
     }
 
     @Override
-    public void close() throws IOException {
-        writeBuffer.release();
+    public synchronized void close() throws IOException {
+        ReferenceCountUtil.safeRelease(writeBuffer);
+        fileChannel.close();
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
index 87e1d43..cfaee56 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
@@ -49,11 +49,4 @@ public abstract class BufferedChannelBase {
         return validateAndGetFileChannel().size();
     }
 
-    /**
-     * Get the {@link FileChannel} that this BufferedChannel wraps around.
-     * @return
-     */
-    public FileChannel getFileChannel() {
-        return fileChannel;
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index 96f38c5..452e996 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.apache.commons.lang3.mutable.MutableInt;
 
@@ -554,7 +555,9 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
     public void close() throws IOException {
         Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
         for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
-            EntryLogger.closeFileChannel(currentLogWithDirInfo.getLogChannel());
+            if (currentLogWithDirInfo.getLogChannel() != null) {
+                currentLogWithDirInfo.getLogChannel().close();
+            }
         }
     }
 
@@ -562,7 +565,7 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
     public void forceClose() {
         Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
         for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
-            EntryLogger.forceCloseFileChannel(currentLogWithDirInfo.getLogChannel());
+            IOUtils.close(log, currentLogWithDirInfo.getLogChannel());
         }
     }
 
@@ -652,7 +655,7 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
             // since this channel is only used for writing, after flushing the channel,
             // we had to close the underlying file channel. Otherwise, we might end up
             // leaking fds which cause the disk spaces could not be reclaimed.
-            EntryLogger.closeFileChannel(channel);
+            channel.close();
             recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
             rotatedLogChannels.remove(channel);
             log.info("Synced entry logger {} to disk.", channel.getLogId());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
index 84e4ad3..3e552d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.IOUtils;
 
 @Slf4j
 class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
@@ -202,7 +203,7 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
             // since this channel is only used for writing, after flushing the channel,
             // we had to close the underlying file channel. Otherwise, we might end up
             // leaking fds which cause the disk spaces could not be reclaimed.
-            EntryLogger.closeFileChannel(channel);
+            channel.close();
             recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
             log.info("Synced entry logger {} to disk.", channel.getLogId());
         }
@@ -211,15 +212,13 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
     @Override
     public void close() throws IOException {
         if (activeLogChannel != null) {
-            EntryLogger.closeFileChannel(activeLogChannel);
+            activeLogChannel.close();
         }
     }
 
     @Override
     public void forceClose() {
-        if (activeLogChannel != null) {
-            EntryLogger.forceCloseFileChannel(activeLogChannel);
-        }
+        IOUtils.close(log, activeLogChannel);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index f8eacbd..ddf255a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -650,7 +650,7 @@ public class EntryLogger {
                 // since this channel is only used for writing, after flushing the channel,
                 // we had to close the underlying file channel. Otherwise, we might end up
                 // leaking fds which cause the disk spaces could not be reclaimed.
-                closeFileChannel(compactionLogChannel);
+                compactionLogChannel.close();
             } else {
                 throw new IOException("Failed to flush compaction log which has already been removed.");
             }
@@ -675,10 +675,12 @@ public class EntryLogger {
                 if (!compactionLogChannel.getLogFile().delete()) {
                     LOG.warn("Could not delete compaction log file {}", compactionLogChannel.getLogFile());
                 }
+
                 try {
-                    closeFileChannel(compactionLogChannel);
+                    compactionLogChannel.close();
                 } catch (IOException e) {
-                    LOG.error("Failed to close file channel for compaction log {}", compactionLogChannel.getLogId());
+                    LOG.error("Failed to close file channel for compaction log {}", compactionLogChannel.getLogId(),
+                            e);
                 }
                 compactionLogChannel = null;
             }
@@ -1069,8 +1071,10 @@ public class EntryLogger {
             logid2FileChannel.clear();
             entryLogManager.close();
             synchronized (compactionLogLock) {
-                closeFileChannel(compactionLogChannel);
-                compactionLogChannel = null;
+                if (compactionLogChannel != null) {
+                    compactionLogChannel.close();
+                    compactionLogChannel = null;
+                }
             }
         } catch (IOException ie) {
             // we have no idea how to avoid io exception during shutting down, so just ignore it
@@ -1082,34 +1086,13 @@ public class EntryLogger {
 
             entryLogManager.forceClose();
             synchronized (compactionLogLock) {
-                forceCloseFileChannel(compactionLogChannel);
+                IOUtils.close(LOG, compactionLogChannel);
             }
         }
         // shutdown the pre-allocation thread
         entryLoggerAllocator.stop();
     }
 
-    static void closeFileChannel(BufferedChannelBase channel) throws IOException {
-        if (null == channel) {
-            return;
-        }
-
-        FileChannel fileChannel = channel.getFileChannel();
-        if (null != fileChannel) {
-            fileChannel.close();
-        }
-    }
-
-    static void forceCloseFileChannel(BufferedChannelBase channel) {
-        if (null == channel) {
-            return;
-        }
-        FileChannel fileChannel = channel.getFileChannel();
-        if (null != fileChannel) {
-            IOUtils.close(LOG, fileChannel);
-        }
-    }
-
     protected LedgerDirsManager getLedgerDirsManager() {
         return ledgerDirsManager;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 0715db4..1b0d707 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -1084,9 +1084,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                             // check whether journal file is over file limit
                             if (shouldRolloverJournal) {
                                 // if the journal file is rolled over, the journal file will be closed after last
-                                // entry is force written to disk. the `bc` is not used anymore, so close it to release
-                                // the buffers in `bc`.
-                                IOUtils.close(LOG, bc);
+                                // entry is force written to disk.
                                 logFile = null;
                                 continue;
                             }
@@ -1146,7 +1144,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             // close will flush the file system cache making any previous
             // cached writes durable so this is fine as well.
             IOUtils.close(LOG, bc);
-            IOUtils.close(LOG, logFile);
         }
         LOG.info("Journal exited loop!");
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index 420bd07..f8a7230 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -257,8 +257,11 @@ class JournalChannel implements Closeable {
         return fc.read(dst);
     }
 
+    @Override
     public void close() throws IOException {
-        fc.close();
+        if (bc != null) {
+            bc.close();
+        }
     }
 
     public void forceWrite(boolean forceMetadata) throws IOException {