You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/21 18:15:25 UTC

[GitHub] sijie closed pull request #1414: Ensure BufferedChannel instance is properly closed

sijie closed pull request #1414: Ensure BufferedChannel instance is properly closed
URL: https://github.com/apache/bookkeeper/pull/1414
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 53628cfd1f..b2dd4be605 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -23,6 +23,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -86,8 +87,9 @@ public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long
     }
 
     @Override
-    public void close() throws IOException {
-        writeBuffer.release();
+    public synchronized void close() throws IOException {
+        ReferenceCountUtil.safeRelease(writeBuffer);
+        fileChannel.close();
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
index 87e1d4355e..cfaee56b41 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
@@ -49,11 +49,4 @@ public long size() throws IOException {
         return validateAndGetFileChannel().size();
     }
 
-    /**
-     * Get the {@link FileChannel} that this BufferedChannel wraps around.
-     * @return
-     */
-    public FileChannel getFileChannel() {
-        return fileChannel;
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index 96f38c5945..452e99607c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -61,6 +61,7 @@
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.apache.commons.lang3.mutable.MutableInt;
 
@@ -554,7 +555,9 @@ public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
     public void close() throws IOException {
         Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
         for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
-            EntryLogger.closeFileChannel(currentLogWithDirInfo.getLogChannel());
+            if (currentLogWithDirInfo.getLogChannel() != null) {
+                currentLogWithDirInfo.getLogChannel().close();
+            }
         }
     }
 
@@ -562,7 +565,7 @@ public void close() throws IOException {
     public void forceClose() {
         Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
         for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
-            EntryLogger.forceCloseFileChannel(currentLogWithDirInfo.getLogChannel());
+            IOUtils.close(log, currentLogWithDirInfo.getLogChannel());
         }
     }
 
@@ -652,7 +655,7 @@ public void flushRotatedLogs() throws IOException {
             // since this channel is only used for writing, after flushing the channel,
             // we had to close the underlying file channel. Otherwise, we might end up
             // leaking fds which cause the disk spaces could not be reclaimed.
-            EntryLogger.closeFileChannel(channel);
+            channel.close();
             recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
             rotatedLogChannels.remove(channel);
             log.info("Synced entry logger {} to disk.", channel.getLogId());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
index 84e4ad36f2..3e552d0fca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -36,6 +36,7 @@
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.IOUtils;
 
 @Slf4j
 class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
@@ -202,7 +203,7 @@ void flushRotatedLogs() throws IOException {
             // since this channel is only used for writing, after flushing the channel,
             // we had to close the underlying file channel. Otherwise, we might end up
             // leaking fds which cause the disk spaces could not be reclaimed.
-            EntryLogger.closeFileChannel(channel);
+            channel.close();
             recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
             log.info("Synced entry logger {} to disk.", channel.getLogId());
         }
@@ -211,15 +212,13 @@ void flushRotatedLogs() throws IOException {
     @Override
     public void close() throws IOException {
         if (activeLogChannel != null) {
-            EntryLogger.closeFileChannel(activeLogChannel);
+            activeLogChannel.close();
         }
     }
 
     @Override
     public void forceClose() {
-        if (activeLogChannel != null) {
-            EntryLogger.forceCloseFileChannel(activeLogChannel);
-        }
+        IOUtils.close(log, activeLogChannel);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index f8eacbdd96..ddf255a3b3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -650,7 +650,7 @@ void flushCompactionLog() throws IOException {
                 // since this channel is only used for writing, after flushing the channel,
                 // we had to close the underlying file channel. Otherwise, we might end up
                 // leaking fds which cause the disk spaces could not be reclaimed.
-                closeFileChannel(compactionLogChannel);
+                compactionLogChannel.close();
             } else {
                 throw new IOException("Failed to flush compaction log which has already been removed.");
             }
@@ -675,10 +675,12 @@ void removeCurCompactionLog() {
                 if (!compactionLogChannel.getLogFile().delete()) {
                     LOG.warn("Could not delete compaction log file {}", compactionLogChannel.getLogFile());
                 }
+
                 try {
-                    closeFileChannel(compactionLogChannel);
+                    compactionLogChannel.close();
                 } catch (IOException e) {
-                    LOG.error("Failed to close file channel for compaction log {}", compactionLogChannel.getLogId());
+                    LOG.error("Failed to close file channel for compaction log {}", compactionLogChannel.getLogId(),
+                            e);
                 }
                 compactionLogChannel = null;
             }
@@ -1069,8 +1071,10 @@ public void shutdown() {
             logid2FileChannel.clear();
             entryLogManager.close();
             synchronized (compactionLogLock) {
-                closeFileChannel(compactionLogChannel);
-                compactionLogChannel = null;
+                if (compactionLogChannel != null) {
+                    compactionLogChannel.close();
+                    compactionLogChannel = null;
+                }
             }
         } catch (IOException ie) {
             // we have no idea how to avoid io exception during shutting down, so just ignore it
@@ -1082,34 +1086,13 @@ public void shutdown() {
 
             entryLogManager.forceClose();
             synchronized (compactionLogLock) {
-                forceCloseFileChannel(compactionLogChannel);
+                IOUtils.close(LOG, compactionLogChannel);
             }
         }
         // shutdown the pre-allocation thread
         entryLoggerAllocator.stop();
     }
 
-    static void closeFileChannel(BufferedChannelBase channel) throws IOException {
-        if (null == channel) {
-            return;
-        }
-
-        FileChannel fileChannel = channel.getFileChannel();
-        if (null != fileChannel) {
-            fileChannel.close();
-        }
-    }
-
-    static void forceCloseFileChannel(BufferedChannelBase channel) {
-        if (null == channel) {
-            return;
-        }
-        FileChannel fileChannel = channel.getFileChannel();
-        if (null != fileChannel) {
-            IOUtils.close(LOG, fileChannel);
-        }
-    }
-
     protected LedgerDirsManager getLedgerDirsManager() {
         return ledgerDirsManager;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 0715db4887..1b0d7070ba 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -1084,9 +1084,7 @@ public void run() {
                             // check whether journal file is over file limit
                             if (shouldRolloverJournal) {
                                 // if the journal file is rolled over, the journal file will be closed after last
-                                // entry is force written to disk. the `bc` is not used anymore, so close it to release
-                                // the buffers in `bc`.
-                                IOUtils.close(LOG, bc);
+                                // entry is force written to disk.
                                 logFile = null;
                                 continue;
                             }
@@ -1146,7 +1144,6 @@ public void run() {
             // close will flush the file system cache making any previous
             // cached writes durable so this is fine as well.
             IOUtils.close(LOG, bc);
-            IOUtils.close(LOG, logFile);
         }
         LOG.info("Journal exited loop!");
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index 420bd07e43..f8a7230aaf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -257,8 +257,11 @@ int read(ByteBuffer dst)
         return fc.read(dst);
     }
 
+    @Override
     public void close() throws IOException {
-        fc.close();
+        if (bc != null) {
+            bc.close();
+        }
     }
 
     public void forceWrite(boolean forceMetadata) throws IOException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services