You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/05/26 19:13:17 UTC
svn commit: r1342929 -
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
Author: ramkrishna
Date: Sat May 26 17:13:17 2012
New Revision: 1342929
URL: http://svn.apache.org/viewvc?rev=1342929&view=rev
Log:
HBASE-6002 Possible chance of resource leak in HlogSplitter (Chinna Rao)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1342929&r1=1342928&r2=1342929&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sat May 26 17:13:17 2012
@@ -456,51 +456,79 @@ public class HLogSplitter {
e = RemoteExceptionHandler.checkIOException(e);
throw e;
} finally {
- int n = 0;
- for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
- Object o = logWritersEntry.getValue();
- long t1 = EnvironmentEdgeManager.currentTimeMillis();
- if ((t1 - last_report_at) > period) {
- last_report_at = t;
- if ((progress_failed == false) && (reporter != null) &&
- (reporter.progress() == false)) {
- progress_failed = true;
+ boolean allWritersClosed = false;
+ try {
+ int n = 0;
+ for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
+ Object o = logWritersEntry.getValue();
+ long t1 = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t1 - last_report_at) > period) {
+ last_report_at = t;
+ if ((progress_failed == false) && (reporter != null)
+ && (reporter.progress() == false)) {
+ progress_failed = true;
+ }
+ }
+ if (o == BAD_WRITER) {
+ continue;
+ }
+ n++;
+ WriterAndPath wap = (WriterAndPath) o;
+ try {
+ wap.writerClosed = true;
+ wap.w.close();
+ LOG.debug("Closed " + wap.p);
+ } catch (IOException e) {
+ LOG.debug("Exception while closing the writer :", e);
+ }
+ Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink
+ .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
+ if (!dst.equals(wap.p) && fs.exists(dst)) {
+ LOG.warn("Found existing old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting " + dst
+ + ", length=" + fs.getFileStatus(dst).getLen());
+ if (!fs.delete(dst, false)) {
+ LOG.warn("Failed deleting of old " + dst);
+ throw new IOException("Failed deleting of old " + dst);
+ }
+ }
+ // Skip the unit tests which create a splitter that reads and writes
+ // the
+ // data without touching disk. TestHLogSplit#testThreading is an
+ // example.
+ if (fs.exists(wap.p)) {
+ if (!fs.rename(wap.p, dst)) {
+ throw new IOException("Failed renaming " + wap.p + " to " + dst);
+ }
+ LOG.debug("Rename " + wap.p + " to " + dst);
}
}
- if (o == BAD_WRITER) {
- continue;
+ allWritersClosed = true;
+ String msg = "Processed " + editsCount + " edits across " + n
+ + " regions" + " threw away edits for " + (logWriters.size() - n)
+ + " regions" + "; log file=" + logPath + " is corrupted = "
+ + isCorrupted + " progress failed = " + progress_failed;
+ LOG.info(msg);
+ status.markComplete(msg);
+ } finally {
+ if (!allWritersClosed) {
+ for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
+ Object o = logWritersEntry.getValue();
+ if (o != BAD_WRITER) {
+ WriterAndPath wap = (WriterAndPath) o;
+ try {
+ if (!wap.writerClosed) {
+ wap.writerClosed = true;
+ wap.w.close();
+ }
+ } catch (IOException e) {
+ LOG.debug("Exception while closing the writer :", e);
+ }
+ }
+ }
}
- n++;
- WriterAndPath wap = (WriterAndPath)o;
- wap.w.close();
- LOG.debug("Closed " + wap.p);
- Path dst = getCompletedRecoveredEditsFilePath(wap.p,
- outputSink.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
- if (!dst.equals(wap.p) && fs.exists(dst)) {
- LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting " + dst
- + ", length=" + fs.getFileStatus(dst).getLen());
- if (!fs.delete(dst, false)) {
- LOG.warn("Failed deleting of old " + dst);
- throw new IOException("Failed deleting of old " + dst);
- }
- }
- // Skip the unit tests which create a splitter that reads and writes the
- // data without touching disk. TestHLogSplit#testThreading is an
- // example.
- if (fs.exists(wap.p)) {
- if (!fs.rename(wap.p, dst)) {
- throw new IOException("Failed renaming " + wap.p + " to " + dst);
- }
- LOG.debug("Rename " + wap.p + " to " + dst);
- }
- }
- String msg = "Processed " + editsCount + " edits across " + n + " regions" +
- " threw away edits for " + (logWriters.size() - n) + " regions" +
- "; log file=" + logPath + " is corrupted = " + isCorrupted +
- " progress failed = " + progress_failed;
- LOG.info(msg);
- status.markComplete(msg);
+ in.close();
+ }
}
return !progress_failed;
}
@@ -1349,6 +1377,11 @@ public class HLogSplitter {
long editsWritten = 0;
/* Number of nanos spent writing to this log */
long nanosSpent = 0;
+
+ /* To check whether a close has already been tried on the
+ * writer
+ */
+ boolean writerClosed = false;
WriterAndPath(final Path p, final Writer w) {
this.p = p;