You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/10/01 23:13:39 UTC

git commit: HBASE-12144 Backport HBASE-10141 'instead of putting expired store files thru compaction, just archive them' to 0.98

Repository: hbase
Updated Branches:
  refs/heads/0.98 bf27f3311 -> c88bec7cd


HBASE-12144 Backport HBASE-10141 'instead of putting expired store files thru compaction, just archive them' to 0.98


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c88bec7c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c88bec7c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c88bec7c

Branch: refs/heads/0.98
Commit: c88bec7cd37cbbb202d9683e5ef70ace903b6f89
Parents: bf27f33
Author: Ted Yu <te...@apache.org>
Authored: Wed Oct 1 21:13:10 2014 +0000
Committer: Ted Yu <te...@apache.org>
Committed: Wed Oct 1 21:13:10 2014 +0000

----------------------------------------------------------------------
 .../regionserver/DefaultStoreFileManager.java   | 21 ++++++++
 .../hadoop/hbase/regionserver/HStore.java       | 50 +++++++++++++++++---
 .../hbase/regionserver/StoreFileManager.java    |  7 +++
 .../regionserver/StripeStoreFileManager.java    | 30 ++++++++++++
 .../compactions/CompactionConfiguration.java    | 12 +----
 .../compactions/RatioBasedCompactionPolicy.java | 44 -----------------
 .../hadoop/hbase/regionserver/TestStore.java    | 41 ++++++++--------
 7 files changed, 124 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c88bec7c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index 0cba2c3..eb5522d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -135,6 +135,27 @@ class DefaultStoreFileManager implements StoreFileManager {
     return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
   }
 
+  @Override
+  public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
+    Collection<StoreFile> expiredStoreFiles = null;
+    ImmutableList<StoreFile> files = storefiles;
+    // 1) We can never get rid of the last file which has the maximum seqid.
+    // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
+    for (int i = 0; i < files.size() - 1; ++i) {
+      StoreFile sf = files.get(i);
+      long fileTs = sf.getReader().getMaxTimestamp();
+      if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+        LOG.info("Found an expired store file: " + sf.getPath()
+            + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
+        if (expiredStoreFiles == null) {
+          expiredStoreFiles = new ArrayList<StoreFile>();
+        }
+        expiredStoreFiles.add(sf);
+      }
+    }
+    return expiredStoreFiles;
+  }
+
   private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
     Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
     storefiles = ImmutableList.copyOf(storeFiles);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88bec7c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a8c3fc3..c6b62d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -153,6 +153,7 @@ public class HStore implements Store {
 
   private ScanInfo scanInfo;
 
+  // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
   final List<StoreFile> filesCompacting = Lists.newArrayList();
 
   // All access must be synchronized.
@@ -1368,6 +1369,9 @@ public class HStore implements Store {
       return null;
     }
 
+    // Before we do compaction, try to get rid of unneeded files to simplify things.
+    removeUnneededFiles();
+
     CompactionContext compaction = storeEngine.createCompaction();
     this.lock.readLock().lock();
     try {
@@ -1422,13 +1426,7 @@ public class HStore implements Store {
           return null;
         }
 
-        // Update filesCompacting (check that we do not try to compact the same StoreFile twice).
-        if (!Collections.disjoint(filesCompacting, selectedFiles)) {
-          Preconditions.checkArgument(false, "%s overlaps with %s",
-              selectedFiles, filesCompacting);
-        }
-        filesCompacting.addAll(selectedFiles);
-        Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
+        addToCompactingFiles(selectedFiles);
 
         // If we're enqueuing a major, clear the force flag.
         boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
@@ -1452,6 +1450,44 @@ public class HStore implements Store {
     return compaction;
   }
 
+  /** Adds the files to compacting files. filesCompacting must be locked. */
+  private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
+    if (filesToAdd == null) return;
+    // Check that we do not try to compact the same StoreFile twice.
+    if (!Collections.disjoint(filesCompacting, filesToAdd)) {
+      Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
+    }
+    filesCompacting.addAll(filesToAdd);
+    Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
+  }
+
+  private void removeUnneededFiles() throws IOException {
+    if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
+    this.lock.readLock().lock();
+    Collection<StoreFile> delSfs = null;
+    try {
+      synchronized (filesCompacting) {
+        long cfTtl = getStoreFileTtl();
+        if (cfTtl != Long.MAX_VALUE) {
+          delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
+              EnvironmentEdgeManager.currentTimeMillis() - cfTtl, filesCompacting);
+          addToCompactingFiles(delSfs);
+        }
+      }
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    if (delSfs == null || delSfs.isEmpty()) return;
+
+    Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files.
+    writeCompactionWalRecord(delSfs, newFiles);
+    replaceStoreFiles(delSfs, newFiles);
+    completeCompaction(delSfs);
+    LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
+        + this + " of " + this.getRegionInfo().getRegionNameAsString()
+        + "; total size for store is " + StringUtils.humanReadableInt(storeSize));
+  }
+
   @Override
   public void cancelRequestedCompaction(CompactionContext compaction) {
     finishCompactionRequest(compaction.getRequest());

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88bec7c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 8e838cf..f703420 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -127,4 +127,11 @@ public interface StoreFileManager {
    * @return The store compaction priority.
    */
   int getStoreCompactionPriority();
+
+  /**
+   * @param maxTs Maximum expired timestamp.
+   * @param filesCompacting Files that are currently compacting.
+   * @return The files which don't have any necessary data according to TTL and other criteria.
+   */
+  Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88bec7c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index 548cce8..d4e8800 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -928,4 +928,34 @@ public class StripeStoreFileManager
   public int getStripeCount() {
     return this.state.stripeFiles.size();
   }
+
+  @Override
+  public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
+    // 1) We can never get rid of the last file which has the maximum seqid in a stripe.
+    // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
+    State state = this.state;
+    Collection<StoreFile> expiredStoreFiles = null;
+    for (ImmutableList<StoreFile> stripe : state.stripeFiles) {
+      expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
+    }
+    return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
+  }
+
+  private Collection<StoreFile> findExpiredFiles(ImmutableList<StoreFile> stripe, long maxTs,
+      List<StoreFile> filesCompacting, Collection<StoreFile> expiredStoreFiles) {
+    // Order by seqnum is reversed.
+    for (int i = 1; i < stripe.size(); ++i) {
+      StoreFile sf = stripe.get(i);
+      long fileTs = sf.getReader().getMaxTimestamp();
+      if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+        LOG.info("Found an expired store file: " + sf.getPath()
+            + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
+        if (expiredStoreFiles == null) {
+          expiredStoreFiles = new ArrayList<StoreFile>();
+        }
+        expiredStoreFiles.add(sf);
+      }
+    }
+    return expiredStoreFiles;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88bec7c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index 19a75f3..dc89512 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -61,7 +61,6 @@ public class CompactionConfiguration {
   double compactionRatio;
   double offPeekCompactionRatio;
   long throttlePoint;
-  boolean shouldDeleteExpired;
   long majorCompactionPeriod;
   float majorCompactionJitter;
 
@@ -80,7 +79,6 @@ public class CompactionConfiguration {
 
     throttlePoint =  conf.getLong("hbase.regionserver.thread.compaction.throttle",
           2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
-    shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
     majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
     // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
     majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F);
@@ -92,7 +90,7 @@ public class CompactionConfiguration {
   public String toString() {
     return String.format(
       "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
-      + "%s delete expired; major period %d, major jitter %f",
+      + " major period %d, major jitter %f",
       minCompactSize,
       maxCompactSize,
       minFilesToCompact,
@@ -100,7 +98,6 @@ public class CompactionConfiguration {
       compactionRatio,
       offPeekCompactionRatio,
       throttlePoint,
-      shouldDeleteExpired ? "" : " don't",
       majorCompactionPeriod,
       majorCompactionJitter);
   }
@@ -169,11 +166,4 @@ public class CompactionConfiguration {
   float getMajorCompactionJitter() {
     return majorCompactionJitter;
   }
-
-  /**
-   * @return Whether expired files should be deleted ASAP using compactions
-   */
-  boolean shouldDeleteExpired() {
-    return shouldDeleteExpired;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88bec7c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
index 94f6395..c70b061 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
@@ -93,16 +93,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
         filesCompacting.size() + " compacting, " + candidateSelection.size() +
         " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
 
-    long cfTtl = this.storeConfigInfo.getStoreFileTtl();
     if (!forceMajor) {
-      // If there are expired files, only select them so that compaction deletes them
-      if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
-        ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
-            candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
-        if (expiredSelection != null) {
-          return new CompactionRequest(expiredSelection);
-        }
-      }
       candidateSelection = skipLargeFiles(candidateSelection);
     }
 
@@ -130,41 +121,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
   }
 
   /**
-   * Select the expired store files to compact
-   *
-   * @param candidates the initial set of storeFiles
-   * @param maxExpiredTimeStamp
-   *          The store file will be marked as expired if its max time stamp is
-   *          less than this maxExpiredTimeStamp.
-   * @return A CompactSelection contains the expired store files as
-   *         filesToCompact
-   */
-  private ArrayList<StoreFile> selectExpiredStoreFiles(
-      ArrayList<StoreFile> candidates, long maxExpiredTimeStamp) {
-    if (candidates == null || candidates.size() == 0) return null;
-    ArrayList<StoreFile> expiredStoreFiles = null;
-
-    for (StoreFile storeFile : candidates) {
-      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
-        LOG.info("Deleting the expired store file by compaction: "
-            + storeFile.getPath() + " whose maxTimeStamp is "
-            + storeFile.getReader().getMaxTimestamp()
-            + " while the max expired timestamp is " + maxExpiredTimeStamp);
-        if (expiredStoreFiles == null) {
-          expiredStoreFiles = new ArrayList<StoreFile>();
-        }
-        expiredStoreFiles.add(storeFile);
-      }
-    }
-    if (expiredStoreFiles != null && expiredStoreFiles.size() == 1
-        && expiredStoreFiles.get(0).getReader().getEntries() == 0) {
-      // If just one empty store file, do not select for compaction.
-      return null;
-    }
-    return expiredStoreFiles;
-  }
-
-  /**
    * @param candidates pre-filtrate
    * @return filtered subset
    * exclude all files above maxCompactSize

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88bec7c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index d03bc63..9efd887 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
@@ -275,11 +276,15 @@ public class TestStore {
     Configuration conf = HBaseConfiguration.create();
     // Enable the expired store file deletion
     conf.setBoolean("hbase.store.delete.expired.storefile", true);
+    // Set the compaction threshold higher to avoid normal compactions.
+    conf.setInt(CompactionConfiguration.MIN_KEY, 5);
+
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setTimeToLive(ttl);
     init(name.getMethodName(), conf, hcd);
 
-    long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
+    long storeTtl = this.store.getScanInfo().getTtl();
+    long sleepTime = storeTtl / storeFileNum;
     long timeStamp;
     // There are 4 store files and the max time stamp difference among these
     // store files will be (this.store.ttl / storeFileNum)
@@ -296,29 +301,27 @@ public class TestStore {
     // Verify the total number of store files
     Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
 
-    // Each compaction request will find one expired store file and delete it
-    // by the compaction.
-    for (int i = 1; i <= storeFileNum; i++) {
+     // Each call will find one expired store file and delete it before compaction happens.
+     // There will be no compaction due to threshold above. Last file will not be replaced.
+    for (int i = 1; i <= storeFileNum - 1; i++) {
       // verify the expired store file.
-      CompactionContext compaction = this.store.requestCompaction();
-      CompactionRequest cr = compaction.getRequest();
-      // the first is expired normally.
-      // If not the first compaction, there is another empty store file,
-      List<StoreFile> files = new ArrayList<StoreFile>(cr.getFiles());
-      Assert.assertEquals(Math.min(i, 2), cr.getFiles().size());
-      for (int j = 0; j < files.size(); j++) {
-        Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
-            .currentTimeMillis() - this.store.getScanInfo().getTtl()));
+      Assert.assertNull(this.store.requestCompaction());
+      Collection<StoreFile> sfs = this.store.getStorefiles();
+      // Ensure i files are gone.
+      Assert.assertEquals(storeFileNum - i, sfs.size());
+      // Ensure only non-expired files remain.
+      for (StoreFile sf : sfs) {
+        Assert.assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTimeMillis() - storeTtl));
       }
-      // Verify that the expired store file is compacted to an empty store file.
-      // Default compaction policy creates just one and only one compacted file.
-      StoreFile compactedFile = this.store.compact(compaction).get(0);
-      // It is an empty store file.
-      Assert.assertEquals(0, compactedFile.getReader().getEntries());
-
       // Let the next store file expired.
       edge.incrementTime(sleepTime);
     }
+    Assert.assertNull(this.store.requestCompaction());
+    Collection<StoreFile> sfs = this.store.getStorefiles();
+    // Assert the last expired file is not removed.
+    Assert.assertEquals(1, sfs.size());
+    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
+    Assert.assertTrue(ts < (edge.currentTimeMillis() - storeTtl));
   }
 
   @Test