You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:43:05 UTC

svn commit: r1181938 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver: HRegion.java Store.java wal/HLog.java

Author: nspiegelberg
Date: Tue Oct 11 17:43:05 2011
New Revision: 1181938

URL: http://svn.apache.org/viewvc?rev=1181938&view=rev
Log:
Prevent leaks of MonitoredTasks

Summary: At each use of a MonitoredTask that was not already cleaned up using a
try ... finally block, I added such a failsafe against leaks.
Test Plan: Compiled and run on dev server.
Reviewed By: nspiegelberg
Reviewers: nspiegelberg, kannan
CC: hbase@lists, nspiegelberg
Differential Revision: 305954

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181938&r1=1181937&r2=1181938&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 17:43:05 2011
@@ -475,49 +475,54 @@ public class HRegion implements HeapSize
   throws IOException {
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Initializing region " + this);
-    // Write HRI to a file in case we need to recover .META.
-    status.setStatus("Writing region info on filesystem");
-    checkRegioninfoOnFilesystem();
-
-    // Remove temporary data left over from old regions
-    status.setStatus("Cleaning up temporary data from old regions");
-    cleanupTmpDir();
-
-    // Load in all the HStores.  Get maximum seqid.
-    long maxSeqId = -1;
-    for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
-      status.setStatus("Instantiating store for column family " + c);
-      Store store = instantiateHStore(this.tableDir, c);
-      this.stores.put(c.getName(), store);
-      long storeSeqId = store.getMaxSequenceId();
-      if (storeSeqId > maxSeqId) {
-        maxSeqId = storeSeqId;
-      }
-    }
-    // Recover any edits if available.
-    maxSeqId = replayRecoveredEditsIfAny(
-        this.regiondir, maxSeqId, reporter, status);
-
-    // Get rid of any splits or merges that were lost in-progress.  Clean out
-    // these directories here on open.  We may be opening a region that was
-    // being split but we crashed in the middle of it all.
-    status.setStatus("Cleaning up detritus from prior splits");
-    FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
-    FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
-
-    // See if region is meant to run read-only.
-    if (this.regionInfo.getTableDesc().isReadOnly()) {
-      this.writestate.setReadOnly(true);
-    }
-
-    this.writestate.compacting = 0;
-    this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
-    // Use maximum of log sequenceid or that which was found in stores
-    // (particularly if no recovered edits, seqid will be -1).
-    long nextSeqid = maxSeqId + 1;
-    LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
-    status.markComplete("Region opened successfully");
-    return nextSeqid;
+    try {
+      // Write HRI to a file in case we need to recover .META.
+      status.setStatus("Writing region info on filesystem");
+      checkRegioninfoOnFilesystem();
+
+      // Remove temporary data left over from old regions
+      status.setStatus("Cleaning up temporary data from old regions");
+      cleanupTmpDir();
+
+      // Load in all the HStores.  Get maximum seqid.
+      long maxSeqId = -1;
+      for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
+        status.setStatus("Instantiating store for column family " + c);
+        Store store = instantiateHStore(this.tableDir, c);
+        this.stores.put(c.getName(), store);
+        long storeSeqId = store.getMaxSequenceId();
+        if (storeSeqId > maxSeqId) {
+          maxSeqId = storeSeqId;
+        }
+      }
+      // Recover any edits if available.
+      maxSeqId = replayRecoveredEditsIfAny(
+          this.regiondir, maxSeqId, reporter, status);
+
+      // Get rid of any splits or merges that were lost in-progress.  Clean out
+      // these directories here on open.  We may be opening a region that was
+      // being split but we crashed in the middle of it all.
+      status.setStatus("Cleaning up detritus from prior splits");
+      FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
+      FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
+
+      // See if region is meant to run read-only.
+      if (this.regionInfo.getTableDesc().isReadOnly()) {
+        this.writestate.setReadOnly(true);
+      }
+
+      this.writestate.compacting = 0;
+      this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
+      // Use maximum of log sequenceid or that which was found in stores
+      // (particularly if no recovered edits, seqid will be -1).
+      long nextSeqid = maxSeqId + 1;
+      LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
+      status.markComplete("Region opened successfully");
+      return nextSeqid;
+    } finally {
+      // prevent MonitoredTask leaks due to thrown exceptions
+      status.cleanup();
+    }
   }
 
   /*
@@ -725,6 +730,8 @@ public class HRegion implements HeapSize
         }
       } finally {
         newScannerLock.writeLock().unlock();
+        // prevent MonitoredTask leaks due to thrown exceptions
+        status.cleanup();
       }
     }
   }
@@ -1103,43 +1110,47 @@ public class HRegion implements HeapSize
    */
   public boolean flushcache() throws IOException {
     MonitoredTask status = TaskMonitor.get().createStatus("Flushing" + this);
-    if (this.closed.get()) {
-      status.abort("Skipped: closed");
-      return false;
-    }
-    synchronized (writestate) {
-      if (!writestate.flushing && writestate.writesEnabled) {
-        this.writestate.flushing = true;
-      } else {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("NOT flushing memstore for region " + this +
-            ", flushing=" +
-              writestate.flushing + ", writesEnabled=" +
-              writestate.writesEnabled);
-        }
-        status.abort("Not flushing since " + (writestate.flushing ?
-              "already flushing" : "writes not enabled"));
+    try {
+      if (this.closed.get()) {
+        status.abort("Skipped: closed");
         return false;
       }
-    }
-    try {
-      // Prevent splits and closes
-      status.setStatus("Acquiring readlock on region");
-      splitsAndClosesLock.readLock().lock();
+      synchronized (writestate) {
+        if (!writestate.flushing && writestate.writesEnabled) {
+          this.writestate.flushing = true;
+        } else {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("NOT flushing memstore for region " + this +
+              ", flushing=" +
+                writestate.flushing + ", writesEnabled=" +
+                writestate.writesEnabled);
+          }
+          status.abort("Not flushing since " + (writestate.flushing ?
+                "already flushing" : "writes not enabled"));
+          return false;
+        }
+      }
       try {
-        boolean result = internalFlushcache(status);
-        status.markComplete("Flush successful");
-        return result;
+        // Prevent splits and closes
+        status.setStatus("Acquiring readlock on region");
+        splitsAndClosesLock.readLock().lock();
+        try {
+          boolean result = internalFlushcache(status);
+          status.markComplete("Flush successful");
+          return result;
+        } finally {
+          splitsAndClosesLock.readLock().unlock();
+        }
       } finally {
-        splitsAndClosesLock.readLock().unlock();
+        synchronized (writestate) {
+          writestate.flushing = false;
+          this.writestate.flushRequested = false;
+          writestate.notifyAll();
+        }
       }
     } finally {
-      synchronized (writestate) {
-        writestate.flushing = false;
-        this.writestate.flushRequested = false;
-        writestate.notifyAll();
-        status.cleanup();
-      }
+      // prevent MonitoredTask leaks due to thrown exceptions
+      status.cleanup();
     }
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181938&r1=1181937&r2=1181938&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 17:43:05 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -655,25 +656,27 @@ public class Store implements HeapSize {
       StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
       // Move the compaction into place.
       sf = completeCompaction(filesToCompact, writer);
+
+      // Report that the compaction is complete.
+      status.markComplete("Completed compaction");
+      LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+          + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+          + this.region.getRegionInfo().getRegionNameAsString()
+          + "; new storefile name=" + (sf == null ? "none" : sf.toString())
+          + ", size=" + (sf == null ? "none" :
+            StringUtils.humanReadableInt(sf.getReader().length()))
+          + "; total size for store is "
+          + StringUtils.humanReadableInt(storeSize));
     } catch (IOException ioe) {
       // rather than leak the status, we abort here, then rethrow the exception
-      status.abort("IOException thrown");
+      status.abort(StringUtils.stringifyException(ioe));
       throw ioe;
     } finally {
       synchronized (filesCompacting) {
         filesCompacting.removeAll(filesToCompact);
       }
+      status.cleanup();
     }
-
-    status.markComplete("Completed compaction");
-    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
-        + this.region.getRegionInfo().getRegionNameAsString()
-        + "; new storefile name=" + (sf == null ? "none" : sf.toString())
-        + ", size=" + (sf == null ? "none" :
-          StringUtils.humanReadableInt(sf.getReader().length()))
-        + "; total size for store is "
-        + StringUtils.humanReadableInt(storeSize));
   }
 
   /*

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1181938&r1=1181937&r2=1181938&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 11 17:43:05 2011
@@ -1309,45 +1309,51 @@ public class HLog implements Syncable {
   throws IOException {
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Splitting logs in " + srcDir);
-    long startTime = System.currentTimeMillis();
-    List<Path> splits = null;
-    status.setStatus("Determining files to split");
-    if (!fs.exists(srcDir)) {
-      // Nothing to do
-      status.markComplete("No log directory existed to split.");
-      return splits;
-    }
-    FileStatus [] logfiles = fs.listStatus(srcDir);
-    if (logfiles == null || logfiles.length == 0) {
-      // Nothing to do
+    try {
+      long startTime = System.currentTimeMillis();
+      List<Path> splits = null;
+      status.setStatus("Determining files to split");
+      if (!fs.exists(srcDir)) {
+        // Nothing to do
+        status.markComplete("No log directory existed to split.");
+        return splits;
+      }
+      FileStatus [] logfiles = fs.listStatus(srcDir);
+      if (logfiles == null || logfiles.length == 0) {
+        // Nothing to do
+        status.markComplete("No log files existed to split.");
+        return splits;
+      }
+      LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
+        srcDir.toString());
+      status.setStatus("Performing split");
+      splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
+      try {
+        FileStatus[] files = fs.listStatus(srcDir);
+        for(FileStatus file : files) {
+          Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+          LOG.info("Moving " +  FSUtils.getPath(file.getPath()) + " to " +
+                     FSUtils.getPath(newPath));
+          fs.rename(file.getPath(), newPath);
+        }
+        LOG.debug("Moved " + files.length + " log files to " +
+          FSUtils.getPath(oldLogDir));
+        fs.delete(srcDir, true);
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        IOException io = new IOException("Cannot delete: " + srcDir);
+        io.initCause(e);
+        throw io;
+      }
+      lastSplitTime = System.currentTimeMillis() - startTime;
+      status.markComplete("Log splits complete.");
+      LOG.info("hlog file splitting completed in " + lastSplitTime +
+          " ms for " + srcDir.toString());
       return splits;
+    } finally {
+      // prevent MonitoredTask leaks due to thrown exceptions
+      status.cleanup();
     }
-    LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
-      srcDir.toString());
-    status.setStatus("Performing split");
-    splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
-    try {
-      FileStatus[] files = fs.listStatus(srcDir);
-      for(FileStatus file : files) {
-        Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
-        LOG.info("Moving " +  FSUtils.getPath(file.getPath()) + " to " +
-                   FSUtils.getPath(newPath));
-        fs.rename(file.getPath(), newPath);
-      }
-      LOG.debug("Moved " + files.length + " log files to " +
-        FSUtils.getPath(oldLogDir));
-      fs.delete(srcDir, true);
-    } catch (IOException e) {
-      e = RemoteExceptionHandler.checkIOException(e);
-      IOException io = new IOException("Cannot delete: " + srcDir);
-      io.initCause(e);
-      throw io;
-    }
-    lastSplitTime = System.currentTimeMillis() - startTime;
-    status.markComplete("Log splits complete.");
-    LOG.info("hlog file splitting completed in " + lastSplitTime +
-        " ms for " + srcDir.toString());
-    return splits;
   }
 
   // Private immutable datastructure to hold Writer and its Path.
@@ -1417,79 +1423,87 @@ public class HLog implements Syncable {
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Splitting logs in " + srcDir);
 
-    // Number of logs in a read batch
-    // More means faster but bigger mem consumption
-    //TODO make a note on the conf rename and update hbase-site.xml if needed
-    int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
-    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
+    try {
+      // Number of logs in a read batch
+      // More means faster but bigger mem consumption
+      //TODO make a note on the conf rename and update hbase-site.xml if needed
+      int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
+      boolean skipErrors =
+        conf.getBoolean("hbase.hlog.split.skip.errors", false);
 
-    lastSplitSize = 0;
+      lastSplitSize = 0;
 
-    status.setStatus("Performing split");
-    try {
-      int i = -1;
-      while (i < logfiles.length) {
-        final Map<byte[], LinkedList<Entry>> editsByRegion =
-          new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
-        for (int j = 0; j < logFilesPerStep; j++) {
-          i++;
-          if (i == logfiles.length) {
-            break;
-          }
-          FileStatus log = logfiles[i];
-          Path logPath = log.getPath();
-          long logLength = log.getLen();
-          lastSplitSize += logLength;
-          LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
-            ": " + logPath + ", length=" + logLength );
-          try {
-            recoverFileLease(fs, logPath, conf);
-            parseHLog(log, editsByRegion, fs, conf);
-            processedLogs.add(logPath);
-          } catch (EOFException eof) {
-            // truncated files are expected if a RS crashes (see HBASE-2643)
-            LOG.warn("EOF from hlog " + logPath + ".  continuing");
-            processedLogs.add(logPath);
-          } catch (InterruptedIOException iioe) {
-            status.abort(StringUtils.stringifyException(iioe));
-            throw iioe;
-          } catch (IOException e) {
-            // If the IOE resulted from bad file format,
-            // then this problem is idempotent and retrying won't help
-            if (e.getCause() instanceof ParseException) {
-              LOG.warn("ParseException from hlog " + logPath + ".  continuing");
+      status.setStatus("Performing split");
+      try {
+        int i = -1;
+        while (i < logfiles.length) {
+          final Map<byte[], LinkedList<Entry>> editsByRegion =
+            new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
+          for (int j = 0; j < logFilesPerStep; j++) {
+            i++;
+            if (i == logfiles.length) {
+              break;
+            }
+            FileStatus log = logfiles[i];
+            Path logPath = log.getPath();
+            long logLength = log.getLen();
+            lastSplitSize += logLength;
+            LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+              ": " + logPath + ", length=" + logLength );
+            try {
+              recoverFileLease(fs, logPath, conf);
+              parseHLog(log, editsByRegion, fs, conf);
+              processedLogs.add(logPath);
+            } catch (EOFException eof) {
+              // truncated files are expected if a RS crashes (see HBASE-2643)
+              LOG.warn("EOF from hlog " + logPath + ".  continuing");
               processedLogs.add(logPath);
-            } else {
-              if (skipErrors) {
-                LOG.info("Got while parsing hlog " + logPath +
-                  ". Marking as corrupted", e);
-                corruptedLogs.add(logPath);
+            } catch (InterruptedIOException iioe) {
+              status.abort(StringUtils.stringifyException(iioe));
+              throw iioe;
+            } catch (IOException e) {
+              // If the IOE resulted from bad file format,
+              // then this problem is idempotent and retrying won't help
+              if (e.getCause() instanceof ParseException) {
+                LOG.warn("ParseException from hlog " +
+                    logPath + ".  continuing");
+                processedLogs.add(logPath);
               } else {
-                status.abort(StringUtils.stringifyException(e));
-                throw e;
+                if (skipErrors) {
+                  LOG.info("Got while parsing hlog " + logPath +
+                    ". Marking as corrupted", e);
+                  corruptedLogs.add(logPath);
+                } else {
+                  status.abort(StringUtils.stringifyException(e));
+                  throw e;
+                }
               }
             }
           }
+          writeEditsBatchToRegions(editsByRegion, logWriters,
+              rootDir, fs, conf);
+        }
+        if (fs.listStatus(srcDir).length > processedLogs.size() +
+            corruptedLogs.size()) {
+          status.abort("Discovered orphan hlog after split");
+          throw new IOException("Discovered orphan hlog after split. Maybe " +
+            "HRegionServer was not dead when we started");
+        }
+      } finally {
+        splits = new ArrayList<Path>(logWriters.size());
+        for (WriterAndPath wap : logWriters.values()) {
+          wap.w.close();
+          splits.add(wap.p);
+          LOG.debug("Closed " + wap.p);
         }
-        writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
-      }
-      if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
-        status.abort("Discovered orphan hlog after split");
-        throw new IOException("Discovered orphan hlog after split. Maybe " +
-          "HRegionServer was not dead when we started");
       }
+      status.setStatus("Archiving logs after completed split");
+      archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+      status.markComplete("Split completed");
+      return splits;
     } finally {
-      splits = new ArrayList<Path>(logWriters.size());
-      for (WriterAndPath wap : logWriters.values()) {
-        wap.w.close();
-        splits.add(wap.p);
-        LOG.debug("Closed " + wap.p);
-      }
+      status.cleanup();
     }
-    status.setStatus("Archiving logs after completed split");
-    archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
-    status.markComplete("Split completed");
-    return splits;
   }