You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2014/04/23 04:34:26 UTC
svn commit: r1589332 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/
test/java/org/apache/hadoop/hbase/regionserver/
Author: sershe
Date: Wed Apr 23 02:34:25 2014
New Revision: 1589332
URL: http://svn.apache.org/r1589332
Log:
HBASE-10141 instead of putting expired store files thru compaction, just archive them
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java?rev=1589332&r1=1589331&r2=1589332&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java Wed Apr 23 02:34:25 2014
@@ -135,6 +135,27 @@ class DefaultStoreFileManager implements
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}
+ @Override
+ public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
+ Collection<StoreFile> expiredStoreFiles = null;
+ ImmutableList<StoreFile> files = storefiles;
+ // 1) We can never get rid of the last file which has the maximum seqid.
+ // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
+ for (int i = 0; i < files.size() - 1; ++i) {
+ StoreFile sf = files.get(i);
+ long fileTs = sf.getReader().getMaxTimestamp();
+ if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+ LOG.info("Found an expired store file: " + sf.getPath()
+ + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
+ if (expiredStoreFiles == null) {
+ expiredStoreFiles = new ArrayList<StoreFile>();
+ }
+ expiredStoreFiles.add(sf);
+ }
+ }
+ return expiredStoreFiles;
+ }
+
private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
storefiles = ImmutableList.copyOf(storeFiles);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1589332&r1=1589331&r2=1589332&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Wed Apr 23 02:34:25 2014
@@ -155,6 +155,7 @@ public class HStore implements Store {
private ScanInfo scanInfo;
+ // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
final List<StoreFile> filesCompacting = Lists.newArrayList();
// All access must be synchronized.
@@ -1343,6 +1344,9 @@ public class HStore implements Store {
return null;
}
+ // Before we do compaction, try to get rid of unneeded files to simplify things.
+ removeUnneededFiles();
+
CompactionContext compaction = storeEngine.createCompaction();
CompactionRequest request = null;
this.lock.readLock().lock();
@@ -1398,13 +1402,7 @@ public class HStore implements Store {
return null;
}
- // Update filesCompacting (check that we do not try to compact the same StoreFile twice).
- if (!Collections.disjoint(filesCompacting, selectedFiles)) {
- Preconditions.checkArgument(false, "%s overlaps with %s",
- selectedFiles, filesCompacting);
- }
- filesCompacting.addAll(selectedFiles);
- Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
+ addToCompactingFiles(selectedFiles);
// If we're enqueuing a major, clear the force flag.
this.forceMajor = this.forceMajor && !request.isMajor();
@@ -1425,6 +1423,44 @@ public class HStore implements Store {
return compaction;
}
+ /** Adds the files to compacting files. filesCompacting must be locked. */
+ private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
+ if (filesToAdd == null) return;
+ // Check that we do not try to compact the same StoreFile twice.
+ if (!Collections.disjoint(filesCompacting, filesToAdd)) {
+ Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
+ }
+ filesCompacting.addAll(filesToAdd);
+ Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
+ }
+
+ private void removeUnneededFiles() throws IOException {
+ if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
+ this.lock.readLock().lock();
+ Collection<StoreFile> delSfs = null;
+ try {
+ synchronized (filesCompacting) {
+ long cfTtl = getStoreFileTtl();
+ if (cfTtl != Long.MAX_VALUE) {
+ delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
+ EnvironmentEdgeManager.currentTimeMillis() - cfTtl, filesCompacting);
+ addToCompactingFiles(delSfs);
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ if (delSfs == null || delSfs.isEmpty()) return;
+
+ Collection<StoreFile> newFiles = new ArrayList<StoreFile>(); // No new files.
+ writeCompactionWalRecord(delSfs, newFiles);
+ replaceStoreFiles(delSfs, newFiles);
+ completeCompaction(delSfs);
+ LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
+ + this + " of " + this.getRegionInfo().getRegionNameAsString()
+ + "; total size for store is " + StringUtils.humanReadableInt(storeSize));
+ }
+
@Override
public void cancelRequestedCompaction(CompactionContext compaction) {
finishCompactionRequest(compaction.getRequest());
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java?rev=1589332&r1=1589331&r2=1589332&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java Wed Apr 23 02:34:25 2014
@@ -127,4 +127,11 @@ public interface StoreFileManager {
* @return The store compaction priority.
*/
int getStoreCompactionPriority();
+
+ /**
+ * @param maxTs Maximum expired timestamp.
+ * @param filesCompacting Files that are currently compacting.
+ * @return The files which don't have any necessary data according to TTL and other criteria.
+ */
+ Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java?rev=1589332&r1=1589331&r2=1589332&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java Wed Apr 23 02:34:25 2014
@@ -919,4 +919,34 @@ public class StripeStoreFileManager
public int getStripeCount() {
return this.state.stripeFiles.size();
}
+
+ @Override
+ public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
+ // 1) We can never get rid of the last file which has the maximum seqid in a stripe.
+ // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
+ State state = this.state;
+ Collection<StoreFile> expiredStoreFiles = null;
+ for (ImmutableList<StoreFile> stripe : state.stripeFiles) {
+ expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
+ }
+ return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
+ }
+
+ private Collection<StoreFile> findExpiredFiles(ImmutableList<StoreFile> stripe, long maxTs,
+ List<StoreFile> filesCompacting, Collection<StoreFile> expiredStoreFiles) {
+ // Order by seqnum is reversed.
+ for (int i = 1; i < stripe.size(); ++i) {
+ StoreFile sf = stripe.get(i);
+ long fileTs = sf.getReader().getMaxTimestamp();
+ if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+ LOG.info("Found an expired store file: " + sf.getPath()
+ + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
+ if (expiredStoreFiles == null) {
+ expiredStoreFiles = new ArrayList<StoreFile>();
+ }
+ expiredStoreFiles.add(sf);
+ }
+ }
+ return expiredStoreFiles;
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java?rev=1589332&r1=1589331&r2=1589332&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java Wed Apr 23 02:34:25 2014
@@ -61,7 +61,6 @@ public class CompactionConfiguration {
double compactionRatio;
double offPeekCompactionRatio;
long throttlePoint;
- boolean shouldDeleteExpired;
long majorCompactionPeriod;
float majorCompactionJitter;
@@ -80,7 +79,6 @@ public class CompactionConfiguration {
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
- shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F);
@@ -92,7 +90,7 @@ public class CompactionConfiguration {
public String toString() {
return String.format(
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
- + "%s delete expired; major period %d, major jitter %f",
+ + " major period %d, major jitter %f",
minCompactSize,
maxCompactSize,
minFilesToCompact,
@@ -100,7 +98,6 @@ public class CompactionConfiguration {
compactionRatio,
offPeekCompactionRatio,
throttlePoint,
- shouldDeleteExpired ? "" : " don't",
majorCompactionPeriod,
majorCompactionJitter);
}
@@ -169,11 +166,4 @@ public class CompactionConfiguration {
float getMajorCompactionJitter() {
return majorCompactionJitter;
}
-
- /**
- * @return Whether expired files should be deleted ASAP using compactions
- */
- boolean shouldDeleteExpired() {
- return shouldDeleteExpired;
- }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java?rev=1589332&r1=1589331&r2=1589332&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java Wed Apr 23 02:34:25 2014
@@ -96,15 +96,6 @@ public class RatioBasedCompactionPolicy
// If we can't have all files, we cannot do major anyway
boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
if (!(forceMajor && isAllFiles)) {
- // If there are expired files, only select them so that compaction deletes them
- long cfTtl = this.storeConfigInfo.getStoreFileTtl();
- if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
- ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
- candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
- if (expiredSelection != null) {
- return new CompactionRequest(expiredSelection);
- }
- }
candidateSelection = skipLargeFiles(candidateSelection);
isAllFiles = candidateFiles.size() == candidateSelection.size();
}
@@ -132,41 +123,6 @@ public class RatioBasedCompactionPolicy
}
/**
- * Select the expired store files to compact
- *
- * @param candidates the initial set of storeFiles
- * @param maxExpiredTimeStamp
- * The store file will be marked as expired if its max time stamp is
- * less than this maxExpiredTimeStamp.
- * @return A CompactSelection contains the expired store files as
- * filesToCompact
- */
- private ArrayList<StoreFile> selectExpiredStoreFiles(
- ArrayList<StoreFile> candidates, long maxExpiredTimeStamp) {
- if (candidates == null || candidates.size() == 0) return null;
- ArrayList<StoreFile> expiredStoreFiles = null;
-
- for (StoreFile storeFile : candidates) {
- if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
- LOG.info("Deleting the expired store file by compaction: "
- + storeFile.getPath() + " whose maxTimeStamp is "
- + storeFile.getReader().getMaxTimestamp()
- + " while the max expired timestamp is " + maxExpiredTimeStamp);
- if (expiredStoreFiles == null) {
- expiredStoreFiles = new ArrayList<StoreFile>();
- }
- expiredStoreFiles.add(storeFile);
- }
- }
- if (expiredStoreFiles != null && expiredStoreFiles.size() == 1
- && expiredStoreFiles.get(0).getReader().getEntries() == 0) {
- // If just one empty store file, do not select for compaction.
- return null;
- }
- return expiredStoreFiles;
- }
-
- /**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all files above maxCompactSize
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1589332&r1=1589331&r2=1589332&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Wed Apr 23 02:34:25 2014
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
@@ -271,15 +274,19 @@ public class TestStore {
int ttl = 4;
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManagerTestHelper.injectEdge(edge);
-
+
Configuration conf = HBaseConfiguration.create();
// Enable the expired store file deletion
conf.setBoolean("hbase.store.delete.expired.storefile", true);
+ // Set the compaction threshold higher to avoid normal compactions.
+ conf.setInt(CompactionConfiguration.MIN_KEY, 5);
+
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setTimeToLive(ttl);
init(name.getMethodName(), conf, hcd);
- long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
+ long storeTtl = this.store.getScanInfo().getTtl();
+ long sleepTime = storeTtl / storeFileNum;
long timeStamp;
// There are 4 store files and the max time stamp difference among these
// store files will be (this.store.ttl / storeFileNum)
@@ -296,29 +303,27 @@ public class TestStore {
// Verify the total number of store files
Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
- // Each compaction request will find one expired store file and delete it
- // by the compaction.
- for (int i = 1; i <= storeFileNum; i++) {
+ // Each call will find one expired store file and delete it before compaction happens.
+ // There will be no compaction due to threshold above. Last file will not be replaced.
+ for (int i = 1; i <= storeFileNum - 1; i++) {
// verify the expired store file.
- CompactionContext compaction = this.store.requestCompaction();
- CompactionRequest cr = compaction.getRequest();
- // the first is expired normally.
- // If not the first compaction, there is another empty store file,
- List<StoreFile> files = new ArrayList<StoreFile>(cr.getFiles());
- Assert.assertEquals(Math.min(i, 2), cr.getFiles().size());
- for (int j = 0; j < files.size(); j++) {
- Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
- .currentTimeMillis() - this.store.getScanInfo().getTtl()));
+ assertNull(this.store.requestCompaction());
+ Collection<StoreFile> sfs = this.store.getStorefiles();
+ // Ensure i files are gone.
+ assertEquals(storeFileNum - i, sfs.size());
+ // Ensure only non-expired files remain.
+ for (StoreFile sf : sfs) {
+ assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTimeMillis() - storeTtl));
}
- // Verify that the expired store file is compacted to an empty store file.
- // Default compaction policy creates just one and only one compacted file.
- StoreFile compactedFile = this.store.compact(compaction).get(0);
- // It is an empty store file.
- Assert.assertEquals(0, compactedFile.getReader().getEntries());
-
// Let the next store file expired.
edge.incrementTime(sleepTime);
}
+ assertNull(this.store.requestCompaction());
+ Collection<StoreFile> sfs = this.store.getStorefiles();
+ // Assert the last expired file is not removed.
+ assertEquals(1, sfs.size());
+ long ts = sfs.iterator().next().getReader().getMaxTimestamp();
+ assertTrue(ts < (edge.currentTimeMillis() - storeTtl));
}
@Test