You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:43:05 UTC
svn commit: r1181938 - in
/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver:
HRegion.java Store.java wal/HLog.java
Author: nspiegelberg
Date: Tue Oct 11 17:43:05 2011
New Revision: 1181938
URL: http://svn.apache.org/viewvc?rev=1181938&view=rev
Log:
Prevent leaks of MonitoredTasks
Summary: At each use of a MonitoredTask that was not already cleaned up using a
try ... finally block, I added such a failsafe against leaks.
Test Plan: Compiled and run on dev server.
Reviewed By: nspiegelberg
Reviewers: nspiegelberg, kannan
CC: hbase@lists, nspiegelberg
Differential Revision: 305954
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181938&r1=1181937&r2=1181938&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 17:43:05 2011
@@ -475,49 +475,54 @@ public class HRegion implements HeapSize
throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Initializing region " + this);
- // Write HRI to a file in case we need to recover .META.
- status.setStatus("Writing region info on filesystem");
- checkRegioninfoOnFilesystem();
-
- // Remove temporary data left over from old regions
- status.setStatus("Cleaning up temporary data from old regions");
- cleanupTmpDir();
-
- // Load in all the HStores. Get maximum seqid.
- long maxSeqId = -1;
- for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
- status.setStatus("Instantiating store for column family " + c);
- Store store = instantiateHStore(this.tableDir, c);
- this.stores.put(c.getName(), store);
- long storeSeqId = store.getMaxSequenceId();
- if (storeSeqId > maxSeqId) {
- maxSeqId = storeSeqId;
- }
- }
- // Recover any edits if available.
- maxSeqId = replayRecoveredEditsIfAny(
- this.regiondir, maxSeqId, reporter, status);
-
- // Get rid of any splits or merges that were lost in-progress. Clean out
- // these directories here on open. We may be opening a region that was
- // being split but we crashed in the middle of it all.
- status.setStatus("Cleaning up detritus from prior splits");
- FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
- FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
-
- // See if region is meant to run read-only.
- if (this.regionInfo.getTableDesc().isReadOnly()) {
- this.writestate.setReadOnly(true);
- }
-
- this.writestate.compacting = 0;
- this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
- // Use maximum of log sequenceid or that which was found in stores
- // (particularly if no recovered edits, seqid will be -1).
- long nextSeqid = maxSeqId + 1;
- LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
- status.markComplete("Region opened successfully");
- return nextSeqid;
+ try {
+ // Write HRI to a file in case we need to recover .META.
+ status.setStatus("Writing region info on filesystem");
+ checkRegioninfoOnFilesystem();
+
+ // Remove temporary data left over from old regions
+ status.setStatus("Cleaning up temporary data from old regions");
+ cleanupTmpDir();
+
+ // Load in all the HStores. Get maximum seqid.
+ long maxSeqId = -1;
+ for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
+ status.setStatus("Instantiating store for column family " + c);
+ Store store = instantiateHStore(this.tableDir, c);
+ this.stores.put(c.getName(), store);
+ long storeSeqId = store.getMaxSequenceId();
+ if (storeSeqId > maxSeqId) {
+ maxSeqId = storeSeqId;
+ }
+ }
+ // Recover any edits if available.
+ maxSeqId = replayRecoveredEditsIfAny(
+ this.regiondir, maxSeqId, reporter, status);
+
+ // Get rid of any splits or merges that were lost in-progress. Clean out
+ // these directories here on open. We may be opening a region that was
+ // being split but we crashed in the middle of it all.
+ status.setStatus("Cleaning up detritus from prior splits");
+ FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
+ FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
+
+ // See if region is meant to run read-only.
+ if (this.regionInfo.getTableDesc().isReadOnly()) {
+ this.writestate.setReadOnly(true);
+ }
+
+ this.writestate.compacting = 0;
+ this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
+ // Use maximum of log sequenceid or that which was found in stores
+ // (particularly if no recovered edits, seqid will be -1).
+ long nextSeqid = maxSeqId + 1;
+ LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
+ status.markComplete("Region opened successfully");
+ return nextSeqid;
+ } finally {
+ // prevent MonitoredTask leaks due to thrown exceptions
+ status.cleanup();
+ }
}
/*
@@ -725,6 +730,8 @@ public class HRegion implements HeapSize
}
} finally {
newScannerLock.writeLock().unlock();
+ // prevent MonitoredTask leaks due to thrown exceptions
+ status.cleanup();
}
}
}
@@ -1103,43 +1110,47 @@ public class HRegion implements HeapSize
*/
public boolean flushcache() throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Flushing" + this);
- if (this.closed.get()) {
- status.abort("Skipped: closed");
- return false;
- }
- synchronized (writestate) {
- if (!writestate.flushing && writestate.writesEnabled) {
- this.writestate.flushing = true;
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("NOT flushing memstore for region " + this +
- ", flushing=" +
- writestate.flushing + ", writesEnabled=" +
- writestate.writesEnabled);
- }
- status.abort("Not flushing since " + (writestate.flushing ?
- "already flushing" : "writes not enabled"));
+ try {
+ if (this.closed.get()) {
+ status.abort("Skipped: closed");
return false;
}
- }
- try {
- // Prevent splits and closes
- status.setStatus("Acquiring readlock on region");
- splitsAndClosesLock.readLock().lock();
+ synchronized (writestate) {
+ if (!writestate.flushing && writestate.writesEnabled) {
+ this.writestate.flushing = true;
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("NOT flushing memstore for region " + this +
+ ", flushing=" +
+ writestate.flushing + ", writesEnabled=" +
+ writestate.writesEnabled);
+ }
+ status.abort("Not flushing since " + (writestate.flushing ?
+ "already flushing" : "writes not enabled"));
+ return false;
+ }
+ }
try {
- boolean result = internalFlushcache(status);
- status.markComplete("Flush successful");
- return result;
+ // Prevent splits and closes
+ status.setStatus("Acquiring readlock on region");
+ splitsAndClosesLock.readLock().lock();
+ try {
+ boolean result = internalFlushcache(status);
+ status.markComplete("Flush successful");
+ return result;
+ } finally {
+ splitsAndClosesLock.readLock().unlock();
+ }
} finally {
- splitsAndClosesLock.readLock().unlock();
+ synchronized (writestate) {
+ writestate.flushing = false;
+ this.writestate.flushRequested = false;
+ writestate.notifyAll();
+ }
}
} finally {
- synchronized (writestate) {
- writestate.flushing = false;
- this.writestate.flushRequested = false;
- writestate.notifyAll();
- status.cleanup();
- }
+ // prevent MonitoredTask leaks due to thrown exceptions
+ status.cleanup();
}
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181938&r1=1181937&r2=1181938&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 17:43:05 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -655,25 +656,27 @@ public class Store implements HeapSize {
StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
// Move the compaction into place.
sf = completeCompaction(filesToCompact, writer);
+
+ // Report that the compaction is complete.
+ status.markComplete("Completed compaction");
+ LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ + this.region.getRegionInfo().getRegionNameAsString()
+ + "; new storefile name=" + (sf == null ? "none" : sf.toString())
+ + ", size=" + (sf == null ? "none" :
+ StringUtils.humanReadableInt(sf.getReader().length()))
+ + "; total size for store is "
+ + StringUtils.humanReadableInt(storeSize));
} catch (IOException ioe) {
// rather than leak the status, we abort here, then rethrow the exception
- status.abort("IOException thrown");
+ status.abort(StringUtils.stringifyException(ioe));
throw ioe;
} finally {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact);
}
+ status.cleanup();
}
-
- status.markComplete("Completed compaction");
- LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
- + this.region.getRegionInfo().getRegionNameAsString()
- + "; new storefile name=" + (sf == null ? "none" : sf.toString())
- + ", size=" + (sf == null ? "none" :
- StringUtils.humanReadableInt(sf.getReader().length()))
- + "; total size for store is "
- + StringUtils.humanReadableInt(storeSize));
}
/*
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1181938&r1=1181937&r2=1181938&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 11 17:43:05 2011
@@ -1309,45 +1309,51 @@ public class HLog implements Syncable {
throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Splitting logs in " + srcDir);
- long startTime = System.currentTimeMillis();
- List<Path> splits = null;
- status.setStatus("Determining files to split");
- if (!fs.exists(srcDir)) {
- // Nothing to do
- status.markComplete("No log directory existed to split.");
- return splits;
- }
- FileStatus [] logfiles = fs.listStatus(srcDir);
- if (logfiles == null || logfiles.length == 0) {
- // Nothing to do
+ try {
+ long startTime = System.currentTimeMillis();
+ List<Path> splits = null;
+ status.setStatus("Determining files to split");
+ if (!fs.exists(srcDir)) {
+ // Nothing to do
+ status.markComplete("No log directory existed to split.");
+ return splits;
+ }
+ FileStatus [] logfiles = fs.listStatus(srcDir);
+ if (logfiles == null || logfiles.length == 0) {
+ // Nothing to do
+ status.markComplete("No log files existed to split.");
+ return splits;
+ }
+ LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
+ srcDir.toString());
+ status.setStatus("Performing split");
+ splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
+ try {
+ FileStatus[] files = fs.listStatus(srcDir);
+ for(FileStatus file : files) {
+ Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+ LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
+ FSUtils.getPath(newPath));
+ fs.rename(file.getPath(), newPath);
+ }
+ LOG.debug("Moved " + files.length + " log files to " +
+ FSUtils.getPath(oldLogDir));
+ fs.delete(srcDir, true);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ IOException io = new IOException("Cannot delete: " + srcDir);
+ io.initCause(e);
+ throw io;
+ }
+ lastSplitTime = System.currentTimeMillis() - startTime;
+ status.markComplete("Log splits complete.");
+ LOG.info("hlog file splitting completed in " + lastSplitTime +
+ " ms for " + srcDir.toString());
return splits;
+ } finally {
+ // prevent MonitoredTask leaks due to thrown exceptions
+ status.cleanup();
}
- LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
- srcDir.toString());
- status.setStatus("Performing split");
- splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
- try {
- FileStatus[] files = fs.listStatus(srcDir);
- for(FileStatus file : files) {
- Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
- LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
- FSUtils.getPath(newPath));
- fs.rename(file.getPath(), newPath);
- }
- LOG.debug("Moved " + files.length + " log files to " +
- FSUtils.getPath(oldLogDir));
- fs.delete(srcDir, true);
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- IOException io = new IOException("Cannot delete: " + srcDir);
- io.initCause(e);
- throw io;
- }
- lastSplitTime = System.currentTimeMillis() - startTime;
- status.markComplete("Log splits complete.");
- LOG.info("hlog file splitting completed in " + lastSplitTime +
- " ms for " + srcDir.toString());
- return splits;
}
// Private immutable datastructure to hold Writer and its Path.
@@ -1417,79 +1423,87 @@ public class HLog implements Syncable {
MonitoredTask status = TaskMonitor.get().createStatus(
"Splitting logs in " + srcDir);
- // Number of logs in a read batch
- // More means faster but bigger mem consumption
- //TODO make a note on the conf rename and update hbase-site.xml if needed
- int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
- boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
+ try {
+ // Number of logs in a read batch
+ // More means faster but bigger mem consumption
+ //TODO make a note on the conf rename and update hbase-site.xml if needed
+ int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
+ boolean skipErrors =
+ conf.getBoolean("hbase.hlog.split.skip.errors", false);
- lastSplitSize = 0;
+ lastSplitSize = 0;
- status.setStatus("Performing split");
- try {
- int i = -1;
- while (i < logfiles.length) {
- final Map<byte[], LinkedList<Entry>> editsByRegion =
- new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
- for (int j = 0; j < logFilesPerStep; j++) {
- i++;
- if (i == logfiles.length) {
- break;
- }
- FileStatus log = logfiles[i];
- Path logPath = log.getPath();
- long logLength = log.getLen();
- lastSplitSize += logLength;
- LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
- ": " + logPath + ", length=" + logLength );
- try {
- recoverFileLease(fs, logPath, conf);
- parseHLog(log, editsByRegion, fs, conf);
- processedLogs.add(logPath);
- } catch (EOFException eof) {
- // truncated files are expected if a RS crashes (see HBASE-2643)
- LOG.warn("EOF from hlog " + logPath + ". continuing");
- processedLogs.add(logPath);
- } catch (InterruptedIOException iioe) {
- status.abort(StringUtils.stringifyException(iioe));
- throw iioe;
- } catch (IOException e) {
- // If the IOE resulted from bad file format,
- // then this problem is idempotent and retrying won't help
- if (e.getCause() instanceof ParseException) {
- LOG.warn("ParseException from hlog " + logPath + ". continuing");
+ status.setStatus("Performing split");
+ try {
+ int i = -1;
+ while (i < logfiles.length) {
+ final Map<byte[], LinkedList<Entry>> editsByRegion =
+ new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
+ for (int j = 0; j < logFilesPerStep; j++) {
+ i++;
+ if (i == logfiles.length) {
+ break;
+ }
+ FileStatus log = logfiles[i];
+ Path logPath = log.getPath();
+ long logLength = log.getLen();
+ lastSplitSize += logLength;
+ LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+ ": " + logPath + ", length=" + logLength );
+ try {
+ recoverFileLease(fs, logPath, conf);
+ parseHLog(log, editsByRegion, fs, conf);
+ processedLogs.add(logPath);
+ } catch (EOFException eof) {
+ // truncated files are expected if a RS crashes (see HBASE-2643)
+ LOG.warn("EOF from hlog " + logPath + ". continuing");
processedLogs.add(logPath);
- } else {
- if (skipErrors) {
- LOG.info("Got while parsing hlog " + logPath +
- ". Marking as corrupted", e);
- corruptedLogs.add(logPath);
+ } catch (InterruptedIOException iioe) {
+ status.abort(StringUtils.stringifyException(iioe));
+ throw iioe;
+ } catch (IOException e) {
+ // If the IOE resulted from bad file format,
+ // then this problem is idempotent and retrying won't help
+ if (e.getCause() instanceof ParseException) {
+ LOG.warn("ParseException from hlog " +
+ logPath + ". continuing");
+ processedLogs.add(logPath);
} else {
- status.abort(StringUtils.stringifyException(e));
- throw e;
+ if (skipErrors) {
+ LOG.info("Got while parsing hlog " + logPath +
+ ". Marking as corrupted", e);
+ corruptedLogs.add(logPath);
+ } else {
+ status.abort(StringUtils.stringifyException(e));
+ throw e;
+ }
}
}
}
+ writeEditsBatchToRegions(editsByRegion, logWriters,
+ rootDir, fs, conf);
+ }
+ if (fs.listStatus(srcDir).length > processedLogs.size() +
+ corruptedLogs.size()) {
+ status.abort("Discovered orphan hlog after split");
+ throw new IOException("Discovered orphan hlog after split. Maybe " +
+ "HRegionServer was not dead when we started");
+ }
+ } finally {
+ splits = new ArrayList<Path>(logWriters.size());
+ for (WriterAndPath wap : logWriters.values()) {
+ wap.w.close();
+ splits.add(wap.p);
+ LOG.debug("Closed " + wap.p);
}
- writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
- }
- if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
- status.abort("Discovered orphan hlog after split");
- throw new IOException("Discovered orphan hlog after split. Maybe " +
- "HRegionServer was not dead when we started");
}
+ status.setStatus("Archiving logs after completed split");
+ archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+ status.markComplete("Split completed");
+ return splits;
} finally {
- splits = new ArrayList<Path>(logWriters.size());
- for (WriterAndPath wap : logWriters.values()) {
- wap.w.close();
- splits.add(wap.p);
- LOG.debug("Closed " + wap.p);
- }
+ status.cleanup();
}
- status.setStatus("Archiving logs after completed split");
- archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
- status.markComplete("Split completed");
- return splits;
}