You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/19 00:17:09 UTC

svn commit: r1340283 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Fri May 18 22:17:09 2012
New Revision: 1340283

URL: http://svn.apache.org/viewvc?rev=1340283&view=rev
Log:
HBASE-5920 New Compactions Logic can silently prevent user-initiated compactions from occurring

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Fri May 18 22:17:09 2012
@@ -49,12 +49,6 @@ public class CompactSplitThread implemen
   private final ThreadPoolExecutor splits;
   private final long throttleSize;
 
-  /* The default priority for user-specified compaction requests.
-   * The user gets top priority unless we have blocking compactions. (Pri <= 0)
-   */
-  public static final int PRIORITY_USER = 1;
-  public static final int NO_PRIORITY = Integer.MIN_VALUE;
-
   /**
    * Splitting should not take place if the total number of regions exceed this.
    * This is not a hard limit to the number of regions but it is a guideline to
@@ -145,7 +139,7 @@ public class CompactSplitThread implemen
 
   public synchronized boolean requestSplit(final HRegion r) {
     // don't split regions that are blocking
-    if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
+    if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
       byte[] midKey = r.checkSplit();
       if (midKey != null) {
         requestSplit(r, midKey);
@@ -174,13 +168,13 @@ public class CompactSplitThread implemen
   public synchronized void requestCompaction(final HRegion r,
       final String why) {
     for(Store s : r.getStores().values()) {
-      requestCompaction(r, s, why, NO_PRIORITY);
+      requestCompaction(r, s, why, Store.NO_PRIORITY);
     }
   }
 
   public synchronized void requestCompaction(final HRegion r, final Store s,
       final String why) {
-    requestCompaction(r, s, why, NO_PRIORITY);
+    requestCompaction(r, s, why, Store.NO_PRIORITY);
   }
 
   public synchronized void requestCompaction(final HRegion r, final String why,
@@ -201,10 +195,10 @@ public class CompactSplitThread implemen
     if (this.server.isStopped()) {
       return;
     }
-    CompactionRequest cr = s.requestCompaction();
+    CompactionRequest cr = s.requestCompaction(priority);
     if (cr != null) {
       cr.setServer(server);
-      if (priority != NO_PRIORITY) {
+      if (priority != Store.NO_PRIORITY) {
         cr.setPriority(priority);
       }
       ThreadPoolExecutor pool = largeCompactions;
@@ -222,6 +216,11 @@ public class CompactSplitThread implemen
             + (why != null && !why.isEmpty() ? "; Because: " + why : "")
             + "; " + this);
       }
+    } else {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Not compacting " + r.getRegionNameAsString() + 
+            " because compaction request was cancelled");
+      }
     }
   }
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri May 18 22:17:09 2012
@@ -2915,9 +2915,11 @@ public class HRegionServer implements HR
     if (major) {
       region.triggerMajorCompaction();
     }
+    LOG.trace("User-triggered compaction requested for region " +
+      region.getRegionNameAsString());
     compactSplitThread.requestCompaction(region, "User-triggered "
         + (major ? "major " : "") + "compaction",
-        CompactSplitThread.PRIORITY_USER);
+        Store.PRIORITY_USER);
   }
 
   /** @return the info server */

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri May 18 22:17:09 2012
@@ -107,6 +107,7 @@ import com.google.common.collect.Lists;
  */
 public class Store extends SchemaConfigured implements HeapSize {
   static final Log LOG = LogFactory.getLog(Store.class);
+
   protected final MemStore memstore;
   // This stores directory in the filesystem.
   private final Path homedir;
@@ -135,6 +136,12 @@ public class Store extends SchemaConfigu
   private final int compactionKVMax;
   private final boolean verifyBulkLoads;
 
+  /* The default priority for user-specified compaction requests.
+   * The user gets top priority unless we have blocking compactions. (Pri <= 0)
+   */
+  public static final int PRIORITY_USER = 1;
+  public static final int NO_PRIORITY = Integer.MIN_VALUE;
+
   // not private for testing
   /* package */ScanInfo scanInfo;
   /*
@@ -170,7 +177,7 @@ public class Store extends SchemaConfigu
    * @param region
    * @param family HColumnDescriptor for this column
    * @param fs file system object
-   * @param conf configuration object
+   * @param confParam configuration object
    * failed.  Can be null.
    * @throws IOException
    */
@@ -317,7 +324,7 @@ public class Store extends SchemaConfigu
   public Path getHomedir() {
     return homedir;
   }
-  
+
   /**
    * @return the data block encoder
    */
@@ -435,7 +442,7 @@ public class Store extends SchemaConfigu
 
   /**
    * Removes a kv from the memstore. The KeyValue is removed only
-   * if its key & memstoreTS matches the key & memstoreTS value of the 
+   * if its key & memstoreTS matches the key & memstoreTS value of the
    * kv parameter.
    *
    * @param kv
@@ -521,8 +528,8 @@ public class Store extends SchemaConfigu
   }
 
   /**
-   * This method should only be called from HRegion.  It is assumed that the 
-   * ranges of values in the HFile fit within the stores assigned region. 
+   * This method should only be called from HRegion.  It is assumed that the
+   * ranges of values in the HFile fit within the stores assigned region.
    * (assertBulkLoadHFileOk checks this)
    */
   void bulkLoadHFile(String srcPathStr) throws IOException {
@@ -602,7 +609,7 @@ public class Store extends SchemaConfigu
         ThreadPoolExecutor storeFileCloserThreadPool = this.region
             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
                 + this.family.getNameAsString());
-  
+
         // close each store file in parallel
         CompletionService<Void> completionService =
           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
@@ -614,7 +621,7 @@ public class Store extends SchemaConfigu
             }
           });
         }
-  
+
         try {
           for (int i = 0; i < result.size(); i++) {
             Future<Void> future = completionService.take();
@@ -743,7 +750,7 @@ public class Store extends SchemaConfigu
       scanner.close();
     }
     if (LOG.isInfoEnabled()) {
-      LOG.info("Flushed " + 
+      LOG.info("Flushed " +
                ", sequenceid=" + logCacheFlushId +
                ", memsize=" + StringUtils.humanReadableInt(flushed) +
                ", into tmp file " + pathName);
@@ -954,7 +961,7 @@ public class Store extends SchemaConfigu
    * <p>We don't want to hold the structureLock for the whole time, as a compact()
    * can be lengthy and we want to allow cache-flushes during this period.
    *
-   * @param CompactionRequest
+   * @param cr
    *          compaction details obtained from requestCompaction()
    * @throws IOException
    */
@@ -1187,7 +1194,7 @@ public class Store extends SchemaConfigu
       if (jitterPct > 0) {
         long jitter = Math.round(ret * jitterPct);
         // deterministic jitter avoids a major compaction storm on restart
-        ImmutableList<StoreFile> snapshot = storefiles; 
+        ImmutableList<StoreFile> snapshot = storefiles;
         if (snapshot != null && !snapshot.isEmpty()) {
           String seed = snapshot.get(0).getPath().getName();
           double curRand = new Random(seed.hashCode()).nextDouble();
@@ -1201,6 +1208,10 @@ public class Store extends SchemaConfigu
   }
 
   public CompactionRequest requestCompaction() {
+    return requestCompaction(NO_PRIORITY);
+  }
+
+  public CompactionRequest requestCompaction(int priority) {
     // don't even select for compaction if writes are disabled
     if (!this.region.areWritesEnabled()) {
       return null;
@@ -1231,7 +1242,7 @@ public class Store extends SchemaConfigu
           // coprocessor is overriding normal file selection
           filesToCompact = new CompactSelection(conf, candidates);
         } else {
-          filesToCompact = compactSelection(candidates);
+          filesToCompact = compactSelection(candidates, priority);
         }
 
         if (region.getCoprocessorHost() != null) {
@@ -1261,7 +1272,7 @@ public class Store extends SchemaConfigu
         }
 
         // everything went better than expected. create a compaction request
-        int pri = getCompactPriority();
+        int pri = getCompactPriority(priority);
         ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
       }
     } catch (IOException ex) {
@@ -1281,6 +1292,16 @@ public class Store extends SchemaConfigu
   }
 
   /**
+   * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
+   * @param candidates
+   * @return
+   * @throws IOException
+   */
+  CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
+    return compactSelection(candidates,NO_PRIORITY);
+  }
+
+  /**
    * Algorithm to choose which files to compact
    *
    * Configuration knobs:
@@ -1299,7 +1320,7 @@ public class Store extends SchemaConfigu
    * @return subset copy of candidate list that meets compaction criteria
    * @throws IOException
    */
-  CompactSelection compactSelection(List<StoreFile> candidates)
+  CompactSelection compactSelection(List<StoreFile> candidates, int priority)
       throws IOException {
     // ASSUMPTION!!! filesCompacting is locked when calling this function
 
@@ -1347,10 +1368,16 @@ public class Store extends SchemaConfigu
       return compactSelection;
     }
 
-    // major compact on user action or age (caveat: we have too many files)
-    boolean majorcompaction =
-      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact()))
-      && compactSelection.getFilesToCompact().size() < this.maxFilesToCompact;
+    // Force a major compaction if this is a user-requested major compaction,
+    // or if we do not have too many files to compact and this was requested
+    // as a major compaction
+    boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) ||
+      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
+      (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
+    );
+    LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
+      this.getColumnFamilyName() + ": Initiating " +
+      (majorcompaction ? "major" : "minor") + "compaction");
 
     if (!majorcompaction &&
         !hasReferences(compactSelection.getFilesToCompact())) {
@@ -1360,6 +1387,11 @@ public class Store extends SchemaConfigu
 
       // skip selection algorithm if we don't have enough files
       if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Not compacting files because we only have " +
+            compactSelection.getFilesToCompact().size() +
+            " files ready for compaction.  Need " + this.minFilesToCompact + " to initiate.");
+        }
         compactSelection.emptyFileList();
         return compactSelection;
       }
@@ -1427,11 +1459,18 @@ public class Store extends SchemaConfigu
         return compactSelection;
       }
     } else {
-      // all files included in this compaction, up to max
-      if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
-        int pastMax =
-          compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
-        compactSelection.clearSubList(0, pastMax);
+      if(majorcompaction) {
+        if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+          LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
+            " files, probably because of a user-requested major compaction");
+          if(priority != PRIORITY_USER) {
+            LOG.error("Compacting more than max files on a non user-requested compaction");
+          }
+        }
+      } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+        // all files included in this compaction, up to max
+        int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
+        compactSelection.getFilesToCompact().subList(0, pastMax).clear();
       }
     }
     return compactSelection;
@@ -2093,11 +2132,21 @@ public class Store extends SchemaConfigu
     return this.memstore.heapSize();
   }
 
+  public int getCompactPriority() {
+    return getCompactPriority(NO_PRIORITY);
+  }
+
   /**
    * @return The priority that this store should have in the compaction queue
+   * @param priority
    */
-  public int getCompactPriority() {
-    return this.blockingStoreFileCount - this.storefiles.size();
+  public int getCompactPriority(int priority) {
+    // If this is a user-requested compaction, leave this at the highest priority
+    if(priority == PRIORITY_USER) {
+      return PRIORITY_USER;
+    } else {
+      return this.blockingStoreFileCount - this.storefiles.size();
+    }
   }
 
   HRegion getHRegion() {
@@ -2225,7 +2274,7 @@ public class Store extends SchemaConfigu
     return this.cacheConf;
   }
 
-  public static final long FIXED_OVERHEAD = 
+  public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
           + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
           + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
@@ -2303,7 +2352,7 @@ public class Store extends SchemaConfigu
     public boolean getKeepDeletedCells() {
       return keepDeletedCells;
     }
-    
+
     public long getTimeToPurgeDeletes() {
       return timeToPurgeDeletes;
     }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Fri May 18 22:17:09 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.experimental.categories.Category;
@@ -76,6 +77,7 @@ public class TestCompaction extends HBas
   private int compactionThreshold;
   private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
   final private byte[] col1, col2;
+  private static final long MAX_FILES_TO_COMPACT = 10;
 
   /** constructor */
   public TestCompaction() throws Exception {
@@ -612,6 +614,43 @@ public class TestCompaction extends HBas
     fail("testCompactionWithCorruptResult failed since no exception was" +
         "thrown while completing a corrupt file");
   }
+  
+  /**
+   * Test for HBASE-5920 - Test user requested major compactions always occurring
+   */
+  public void testNonUserMajorCompactionRequest() throws Exception {
+    Store store = r.getStore(COLUMN_FAMILY);
+    createStoreFile(r);
+    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+      createStoreFile(r);
+    }
+    store.triggerMajorCompaction();
+
+    CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
+    assertNotNull("Expected to receive a compaction request", request);
+    assertEquals(
+      "System-requested major compaction should not occur if there are too many store files",
+      false,
+      request.isMajor());
+  }
+
+  /**
+   * Test for HBASE-5920
+   */
+  public void testUserMajorCompactionRequest() throws IOException{
+    Store store = r.getStore(COLUMN_FAMILY);
+    createStoreFile(r);
+    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+      createStoreFile(r);
+    }
+    store.triggerMajorCompaction();
+    CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
+    assertNotNull("Expected to receive a compaction request", request);
+    assertEquals(
+      "User-requested major compaction should always occur, even if there are too many store files",
+      true, 
+      request.isMajor());
+  }
 
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =