You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/02/09 01:26:56 UTC

svn commit: r1444298 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver: HStore.java MemStoreFlusher.java compactions/CompactionConfiguration.java compactions/CompactionPolicy.java compactions/DefaultCompactionPolicy.java

Author: tedyu
Date: Sat Feb  9 00:26:56 2013
New Revision: 1444298

URL: http://svn.apache.org/r1444298
Log:
HBASE-7784 move the code related to compaction selection specific to default compaction policy, into default compaction policy (from HStore) (Sergey)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1444298&r1=1444297&r2=1444298&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Feb  9 00:26:56 2013
@@ -108,6 +108,8 @@ import com.google.common.collect.Lists;
  */
 @InterfaceAudience.Private
 public class HStore implements Store {
+  public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
+
   static final Log LOG = LogFactory.getLog(HStore.class);
 
   protected final MemStore memstore;
@@ -125,7 +127,6 @@ public class HStore implements Store {
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
   static int closeCheckInterval = 0;
-  private final int blockingStoreFileCount;
   private volatile long storeSize = 0L;
   private volatile long totalUncompressedBytes = 0L;
   private final Object flushLock = new Object();
@@ -209,8 +210,7 @@ public class HStore implements Store {
 
     // Setting up cache configuration for this family
     this.cacheConf = new CacheConfig(conf, family);
-    this.blockingStoreFileCount =
-      conf.getInt("hbase.hstore.blockingStoreFiles", 7);
+
 
     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
@@ -1230,22 +1230,13 @@ public class HStore implements Store {
     CompactionRequest ret = null;
     this.lock.readLock().lock();
     try {
+      List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
       synchronized (filesCompacting) {
-        // candidates = all StoreFiles not already in compaction queue
-        List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
-        if (!filesCompacting.isEmpty()) {
-          // exclude all files older than the newest file we're currently
-          // compacting. this allows us to preserve contiguity (HBASE-2856)
-          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
-          int idx = candidates.indexOf(last);
-          Preconditions.checkArgument(idx != -1);
-          candidates.subList(0, idx + 1).clear();
-        }
-
+        // First we need to pre-select compaction, and then pre-compact selection!
+        candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
         boolean override = false;
         if (region.getCoprocessorHost() != null) {
-          override = region.getCoprocessorHost().preCompactSelection(
-              this, candidates);
+          override = region.getCoprocessorHost().preCompactSelection(this, candidates);
         }
         CompactSelection filesToCompact;
         if (override) {
@@ -1739,12 +1730,12 @@ public class HStore implements Store {
 
   @Override
   public int getCompactPriority(int priority) {
-    // If this is a user-requested compaction, leave this at the highest priority
-    if(priority == Store.PRIORITY_USER) {
-      return Store.PRIORITY_USER;
-    } else {
-      return this.blockingStoreFileCount - this.storeFileManager.getStorefileCount();
+    // If this is a user-requested compaction, leave this at the user priority
+    if (priority != Store.PRIORITY_USER) {
+      priority = this.compactionPolicy.getSystemCompactionPriority(
+        this.storeFileManager.getStorefiles());
     }
+    return priority;
   }
 
   @Override
@@ -1855,8 +1846,7 @@ public class HStore implements Store {
 
   @Override
   public boolean needsCompaction() {
-    return compactionPolicy.needsCompaction(
-      this.storeFileManager.getStorefileCount() - filesCompacting.size());
+    return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting);
   }
 
   @Override
@@ -1866,7 +1856,7 @@ public class HStore implements Store {
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
-              + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+              + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1444298&r1=1444297&r2=1444298&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Sat Feb  9 00:26:56 2013
@@ -85,7 +85,7 @@ class MemStoreFlusher implements FlushRe
     "hbase.regionserver.global.memstore.upperLimit";
   private static final String LOWER_KEY =
     "hbase.regionserver.global.memstore.lowerLimit";
-  private long blockingStoreFilesNumber;
+  private int blockingStoreFileCount;
   private long blockingWaitTime;
   private final Counter updatesBlockedMsHighWater = new Counter();
 
@@ -112,8 +112,8 @@ class MemStoreFlusher implements FlushRe
         "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
     }
     this.globalMemStoreLimitLowMark = lower;
-    this.blockingStoreFilesNumber =
-      conf.getInt("hbase.hstore.blockingStoreFiles", 7);
+    this.blockingStoreFileCount =
+      conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
     this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
@@ -482,7 +482,7 @@ class MemStoreFlusher implements FlushRe
 
   private boolean isTooManyStoreFiles(HRegion region) {
     for (Store hstore : region.stores.values()) {
-      if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
+      if (hstore.getStorefilesCount() > this.blockingStoreFileCount) {
         return true;
       }
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java?rev=1444298&r1=1444297&r2=1444298&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java Sat Feb  9 00:26:56 2013
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 
 /**
@@ -63,6 +64,7 @@ public class CompactionConfiguration {
   boolean shouldDeleteExpired;
   long majorCompactionPeriod;
   float majorCompactionJitter;
+  int blockingStoreFileCount;
 
   CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
     this.conf = conf;
@@ -93,6 +95,8 @@ public class CompactionConfiguration {
     shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
     majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
     majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+    blockingStoreFileCount =
+        conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
 
     LOG.info("Compaction configuration " + this.toString());
   }
@@ -117,6 +121,13 @@ public class CompactionConfiguration {
   }
 
   /**
+   * @return store file count that will cause the memstore of this store to be blocked.
+   */
+  int getBlockingStorefileCount() {
+    return this.blockingStoreFileCount;
+  }
+
+  /**
    * @return lower bound below which compaction is selected without ratio test
    */
   long getMinCompactSize() {
@@ -205,4 +216,4 @@ public class CompactionConfiguration {
   private static boolean isValidHour(int hour) {
     return (hour >= 0 && hour <= 23);
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1444298&r1=1444297&r2=1444298&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Sat Feb  9 00:26:56 2013
@@ -54,6 +54,16 @@ public abstract class CompactionPolicy e
   HStore store;
 
   /**
+   * This is called before coprocessor preCompactSelection and should filter the candidates
+   * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
+   * @param candidateFiles candidate files, ordered from oldest to newest
+   * @param filesCompacting files currently compacting
+   * @return the list of files that can theoretically be compacted.
+   */
+  public abstract List<StoreFile> preSelectCompaction(
+      List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting);
+
+  /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
    * @throws java.io.IOException
@@ -63,6 +73,16 @@ public abstract class CompactionPolicy e
     final boolean forceMajor) throws IOException;
 
   /**
+   * @param storeFiles Store files in the store.
+   * @return The system compaction priority of the store, based on storeFiles.
+   *         The priority range is as such - the smaller values are higher priority;
+   *         1 is user priority; only very important, blocking compactions should use
+   *         values lower than that. With default settings, depending on the number of
+   *         store files, the non-blocking priority will be in 2-6 range.
+   */
+  public abstract int getSystemCompactionPriority(final Collection<StoreFile> storeFiles);
+
+  /**
    * @param filesToCompact Files to compact. Can be null.
    * @return True if we should run a major compaction.
    */
@@ -76,10 +96,12 @@ public abstract class CompactionPolicy e
   public abstract boolean throttleCompaction(long compactionSize);
 
   /**
-   * @param numCandidates Number of candidate store files
+   * @param storeFiles Current store files.
+   * @param filesCompacting files currently compacting.
    * @return whether a compactionSelection is possible
    */
-  public abstract boolean needsCompaction(int numCandidates);
+  public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
+      final List<StoreFile> filesCompacting);
 
   /**
    * Inform the policy that some configuration has been change,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1444298&r1=1444297&r2=1444298&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java Sat Feb  9 00:26:56 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 
@@ -53,6 +54,26 @@ public class DefaultCompactionPolicy ext
     compactor = new DefaultCompactor(this);
   }
 
+  @Override
+  public List<StoreFile> preSelectCompaction(
+      List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
+    // candidates = all storefiles not already in compaction queue
+    if (!filesCompacting.isEmpty()) {
+      // exclude all files older than the newest file we're currently
+      // compacting. this allows us to preserve contiguity (HBASE-2856)
+      StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+      int idx = candidateFiles.indexOf(last);
+      Preconditions.checkArgument(idx != -1);
+      candidateFiles.subList(0, idx + 1).clear();
+    }
+    return candidateFiles;
+  }
+
+  @Override
+  public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
+    return this.comConf.getBlockingStorefileCount() - storeFiles.size();
+  }
+
   /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
@@ -367,11 +388,10 @@ public class DefaultCompactionPolicy ext
     return compactionSize > comConf.getThrottlePoint();
   }
 
-  /**
-   * @param numCandidates Number of candidate store files
-   * @return whether a compactionSelection is possible
-   */
-  public boolean needsCompaction(int numCandidates) {
+  @Override
+  public boolean needsCompaction(final Collection<StoreFile> storeFiles,
+      final List<StoreFile> filesCompacting) {
+    int numCandidates = storeFiles.size() - filesCompacting.size();
     return numCandidates > comConf.getMinFilesToCompact();
   }