You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/05/26 19:13:17 UTC

svn commit: r1342929 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java

Author: ramkrishna
Date: Sat May 26 17:13:17 2012
New Revision: 1342929

URL: http://svn.apache.org/viewvc?rev=1342929&view=rev
Log:
HBASE-6002 Possible chance of resource leak in HlogSplitter (Chinna Rao)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1342929&r1=1342928&r2=1342929&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sat May 26 17:13:17 2012
@@ -456,51 +456,79 @@ public class HLogSplitter {
       e = RemoteExceptionHandler.checkIOException(e);
       throw e;
     } finally {
-      int n = 0;
-      for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
-        Object o = logWritersEntry.getValue();
-        long t1 = EnvironmentEdgeManager.currentTimeMillis();
-        if ((t1 - last_report_at) > period) {
-          last_report_at = t;
-          if ((progress_failed == false) && (reporter != null) &&
-              (reporter.progress() == false)) {
-            progress_failed = true;
+      boolean allWritersClosed = false;
+      try {
+        int n = 0;
+        for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
+          Object o = logWritersEntry.getValue();
+          long t1 = EnvironmentEdgeManager.currentTimeMillis();
+          if ((t1 - last_report_at) > period) {
+            last_report_at = t;
+            if ((progress_failed == false) && (reporter != null)
+                && (reporter.progress() == false)) {
+              progress_failed = true;
+            }
+          }
+          if (o == BAD_WRITER) {
+            continue;
+          }
+          n++;
+          WriterAndPath wap = (WriterAndPath) o;
+          try {
+            wap.writerClosed = true;
+            wap.w.close();
+            LOG.debug("Closed " + wap.p);
+          } catch (IOException e) {
+            LOG.debug("Exception while closing the writer :", e);
+          }
+          Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink
+              .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
+          if (!dst.equals(wap.p) && fs.exists(dst)) {
+            LOG.warn("Found existing old edits file. It could be the "
+                + "result of a previous failed split attempt. Deleting " + dst
+                + ", length=" + fs.getFileStatus(dst).getLen());
+            if (!fs.delete(dst, false)) {
+              LOG.warn("Failed deleting of old " + dst);
+              throw new IOException("Failed deleting of old " + dst);
+            }
+          }
+          // Skip the unit tests which create a splitter that reads and writes
+          // the
+          // data without touching disk. TestHLogSplit#testThreading is an
+          // example.
+          if (fs.exists(wap.p)) {
+            if (!fs.rename(wap.p, dst)) {
+              throw new IOException("Failed renaming " + wap.p + " to " + dst);
+            }
+            LOG.debug("Rename " + wap.p + " to " + dst);
           }
         }
-        if (o == BAD_WRITER) {
-          continue;
+        allWritersClosed = true;
+        String msg = "Processed " + editsCount + " edits across " + n
+            + " regions" + " threw away edits for " + (logWriters.size() - n)
+            + " regions" + "; log file=" + logPath + " is corrupted = "
+            + isCorrupted + " progress failed = " + progress_failed;
+        LOG.info(msg);
+        status.markComplete(msg);
+      } finally {
+        if (!allWritersClosed) {
+          for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
+            Object o = logWritersEntry.getValue();
+            if (o != BAD_WRITER) {
+              WriterAndPath wap = (WriterAndPath) o;
+              try {
+                if (!wap.writerClosed) {
+                  wap.writerClosed = true;
+                  wap.w.close();
+                }
+              } catch (IOException e) {
+                LOG.debug("Exception while closing the writer :", e);
+              }
+            }
+          }
         }
-        n++;
-        WriterAndPath wap = (WriterAndPath)o;
-        wap.w.close();
-        LOG.debug("Closed " + wap.p);
-        Path dst = getCompletedRecoveredEditsFilePath(wap.p,
-          outputSink.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
-        if (!dst.equals(wap.p) && fs.exists(dst)) {
-          LOG.warn("Found existing old edits file. It could be the "
-              + "result of a previous failed split attempt. Deleting " + dst
-              + ", length=" + fs.getFileStatus(dst).getLen());
-          if (!fs.delete(dst, false)) {
-            LOG.warn("Failed deleting of old " + dst);
-            throw new IOException("Failed deleting of old " + dst);
-          }
-        }
-        // Skip the unit tests which create a splitter that reads and writes the
-        // data without touching disk. TestHLogSplit#testThreading is an
-        // example.
-        if (fs.exists(wap.p)) {
-          if (!fs.rename(wap.p, dst)) {
-            throw new IOException("Failed renaming " + wap.p + " to " + dst);
-          }
-          LOG.debug("Rename " + wap.p + " to " + dst);
-        }
-      }
-      String msg = "Processed " + editsCount + " edits across " + n + " regions" +
-        " threw away edits for " + (logWriters.size() - n) + " regions" +
-        "; log file=" + logPath + " is corrupted = " + isCorrupted +
-        " progress failed = " + progress_failed;
-      LOG.info(msg);
-      status.markComplete(msg);
+        in.close();
+      }
     }
     return !progress_failed;
   }
@@ -1349,6 +1377,11 @@ public class HLogSplitter {
     long editsWritten = 0;
     /* Number of nanos spent writing to this log */
     long nanosSpent = 0;
+    
+    /* To check whether a close has already been tried on the
+     * writer
+     */
+    boolean writerClosed = false;
 
     WriterAndPath(final Path p, final Writer w) {
       this.p = p;