You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:07:05 UTC

svn commit: r1181421 - /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java

Author: nspiegelberg
Date: Tue Oct 11 02:07:04 2011
New Revision: 1181421

URL: http://svn.apache.org/viewvc?rev=1181421&view=rev
Log:
HBASE-2462: New Compaction Algorithm

Summary:
1. threshold for unconditional compaction
2. uses summation for filesize comparison
3. handles sf.count() > maxFilesToCompact better
4. understand when to major compact

Test Plan:
mvn clean install -DskipTests

DiffCamp Revision: 174446
Reviewed By: kannan
Commenters: aravind, jgray
CC: jgray, nspiegelberg, kannan, aravind, hbase@lists
Revert Plan:
OK

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181421&r1=1181420&r2=1181421&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:07:04 2011
@@ -93,7 +93,11 @@ public class Store implements HeapSize {
   // ttl in milliseconds.
   protected long ttl;
   private long majorCompactionTime;
-  private int maxFilesToCompact;
+  private final int maxFilesToCompact;
+  private final long minCompactSize;
+  // compactRatio: double on purpose!  Float.MAX < Long.MAX < Double.MAX
+  // With float, java will downcast your long to float for comparisons (bad)
+  private double compactRatio;
   private long lastCompactSize = 0;
   /* how many bytes to write between status checks */
   static int closeCheckInterval = 0;
@@ -191,6 +195,9 @@ public class Store implements HeapSize {
     }
 
     this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
+    this.minCompactSize = this.region.memstoreFlushSize * 3 / 2; // +50% pad
+    this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
+
     if (Store.closeCheckInterval == 0) {
       Store.closeCheckInterval = conf.getInt(
           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
@@ -596,9 +603,6 @@ public class Store implements HeapSize {
         return null;
       }
 
-      // Max-sequenceID is the last key of the storefiles TreeMap
-      long maxId = StoreFile.getMaxSequenceIdInList(storefiles);
-
       // Check to see if we need to do a major compaction on this region.
       // If so, change doMajorCompaction to true to skip the incremental
       // compacting below. Only check if doMajorCompaction is not true.
@@ -612,28 +616,39 @@ public class Store implements HeapSize {
         return checkSplit(forceSplit);
       }
 
-      // HBASE-745, preparing all store file sizes for incremental compacting
-      // selection.
+      // get store file sizes for incremental compacting selection.
+      /* normal skew:
+       *
+       *         older ----> newer
+       *     _
+       *    | |   _
+       *    | |  | |   _
+       *  --|-|- |-|- |-|---_-------_-------  minCompactSize
+       *    | |  | |  | |  | |  _  | |
+       *    | |  | |  | |  | | | | | |
+       *    | |  | |  | |  | | | | | |
+       */
+      boolean normalSkew = true;
       int countOfFiles = filesToCompact.size();
-      long totalSize = 0;
       long [] fileSizes = new long[countOfFiles];
-      long skipped = 0;
-      int point = 0;
       for (int i = 0; i < countOfFiles; i++) {
         StoreFile file = filesToCompact.get(i);
         Path path = file.getPath();
         if (path == null) {
-          LOG.warn("Path is null for " + file);
+          LOG.error("Path is null for " + file);
           return null;
         }
         StoreFile.Reader r = file.getReader();
         if (r == null) {
-          LOG.warn("StoreFile " + file + " has a null Reader");
+          LOG.error("StoreFile " + file + " has a null Reader");
           return null;
         }
-        long len = file.getReader().length();
-        fileSizes[i] = len;
-        totalSize += len;
+        fileSizes[i] = file.getReader().length();
+
+        // normal case: file[i] < file[i-1] unless they unconditionally compact
+        if (normalSkew && i > 0) {
+          normalSkew = fileSizes[i] < Math.max(fileSizes[i-1],minCompactSize);
+        }
       }
 
       // never run major compaction if we have too many files to avoid OOM
@@ -641,49 +656,86 @@ public class Store implements HeapSize {
         majorcompaction = false;
       }
 
+      long totalSize = 0;
       if (!majorcompaction && !references) {
-        // Here we select files for incremental compaction.
-        // The rule is: if the largest(oldest) one is more than twice the
-        // size of the second, skip the largest, and continue to next...,
-        // until we meet the compactionThreshold limit.
-
-        // A problem with the above heuristic is that we could go through all of
-        // filesToCompact and the above condition could hold for all files and
-        // we'd end up with nothing to compact.  To protect against this, we'll
-        // compact the tail -- up to the last 4 files -- of filesToCompact
-        // regardless.
-        int tail = Math.min(countOfFiles, 4);
-        for (point = 0; point < (countOfFiles - tail); point++) {
-          if (((fileSizes[point] < fileSizes[point + 1] * 2) &&
-               (countOfFiles - point) <= maxFilesToCompact)) {
-            break;
+        // we're doing a minor compaction, let's see what files are applicable
+        int start = countOfFiles;
+        int end = countOfFiles;
+        if (normalSkew) {
+          // Start at the newest file
+          while (start > 0) {
+            /* Stop when you find a file too big to consider for compaction.
+             * This means it is:
+             *   (1) an already-compacted, large file (i.e. > minCompactSize)
+             *   (2) compactRatio times larger than sum(other_compact_files)
+             * Given normal skew, any older files will also be too large
+             */
+            if (fileSizes[start-1] >
+                Math.max(minCompactSize, (long)(totalSize * compactRatio))) {
+              break;
+            }
+            // Include the tested file
+            --start;
+            totalSize += fileSizes[start];
+
+            // Only allow a certain number of files.
+            if (end - start > this.maxFilesToCompact) {
+              /* If fileSizes.size() >> maxFilesToCompact, we will recurse on
+               * compact().  First consider to oldest files to avoid a
+               * situation where we always compact the last X and the last file
+               * becomes an aggregate of the previous compactions.
+               */
+              --end;
+              totalSize -= fileSizes[end-1];
+            }
           }
-          skipped += fileSizes[point];
+        } else {
+          // our normal operation skew is off. probably because someone
+          // imported a StoreFile.  unconditionally compact to restore order
+          start = Math.max(end - this.maxFilesToCompact, 0);
+          for (int pos = start; pos < end; ++pos) {
+            totalSize += fileSizes[pos];
+          }
+          LOG.info("StoreFiles in " + this.storeNameStr
+              + " do not follow normal operational skew.  "
+              + "Unconditionally compacting to restore order.");
         }
-        filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(point,
-          countOfFiles));
-        if (filesToCompact.size() <= 1) {
+
+        // if we don't have enough files to compact, just wait
+        if (end - start < this.compactionThreshold) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipped compaction of 1 file; compaction size of " +
-              this.storeNameStr + ": " +
-              StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
-              " files, size: " + skipped);
+            LOG.debug("Skipped compaction of " + this.storeNameStr
+              + ".  Only " + (end - start) + " file(s) of size "
+              + StringUtils.humanReadableInt(totalSize)
+              + " are meet compaction criteria.");
           }
           return checkSplit(forceSplit);
         }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Compaction size of " + this.storeNameStr + ": " +
-            StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
-            " file(s), size: " + skipped);
+
+        if (0 == start && end == countOfFiles) {
+          // we decided all the files were candidates! major compact
+          majorcompaction = true;
+        } else {
+          filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(start,
+            end));
+        }
+      } else {
+        // all files included in this compaction
+        for (long i : fileSizes) {
+          totalSize += i;
         }
       }
-      this.lastCompactSize = totalSize - skipped;
+      this.lastCompactSize = totalSize;
+
+      // Max-sequenceID is the last key in the files we're compacting
+      long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
 
       // Ready to go.  Have list of files to compact.
       LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
           this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
         (references? ", hasReferences=true,": " ") + " into " +
-          region.getTmpDir() + ", seqid=" + maxId);
+          region.getTmpDir() + ", seqid=" + maxId +
+          ", totalSize=" + StringUtils.humanReadableInt(totalSize));
       StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
@@ -691,8 +743,9 @@ public class Store implements HeapSize {
         LOG.info("Completed" + (majorcompaction? " major ": " ") +
           "compaction of " + filesToCompact.size() + " file(s) in " +
           this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
-          "; new storefile is " + (sf == null? "none": sf.toString()) +
-          "; store size is " + StringUtils.humanReadableInt(storeSize));
+          "; new storefile name=" + (sf == null? "none": sf.toString()) +
+          ", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) +
+          "; total size for store is " + StringUtils.humanReadableInt(storeSize));
       }
     }
     return checkSplit(forceSplit);
@@ -1431,8 +1484,8 @@ public class Store implements HeapSize {
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (14 * ClassSize.REFERENCE) +
-      (5 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
+      ClassSize.OBJECT + (14 * ClassSize.REFERENCE) + (1 * Bytes.SIZEOF_DOUBLE) +
+      (6 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +