You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/03/19 01:54:44 UTC
svn commit: r1458073 - in
/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver:
StoreFile.java compactions/Compactor.java compactions/DefaultCompactor.java
Author: sershe
Date: Tue Mar 19 00:54:44 2013
New Revision: 1458073
URL: http://svn.apache.org/r1458073
Log:
HBASE-8080 refactor default compactor to make its parts easier to reuse
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1458073&r1=1458072&r2=1458073&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Mar 19 00:54:44 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -987,7 +988,7 @@ public class StoreFile {
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
*/
- public static class Writer {
+ public static class Writer implements Compactor.CellSink {
private final BloomFilterWriter generalBloomFilterWriter;
private final BloomFilterWriter deleteFamilyBloomFilterWriter;
private final BloomType bloomType;
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1458073&r1=1458072&r2=1458073&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Tue Mar 19 00:54:44 2013
@@ -18,25 +18,59 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.CellOutputStream;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
/**
- * A compactor is a compaction algorithm associated a given policy.
+ * A compactor is a compaction algorithm associated a given policy. Base class also contains
+ * reusable parts for implementing compactors (what is common and what isn't is evolving).
*/
@InterfaceAudience.Private
public abstract class Compactor {
-
+ private static final Log LOG = LogFactory.getLog(Compactor.class);
protected CompactionProgress progress;
protected Configuration conf;
+ protected Store store;
+
+ private int compactionKVMax;
+ protected Compression.Algorithm compactionCompression;
- Compactor(final Configuration conf) {
+ //TODO: depending on Store is not good but, realistically, all compactors currently do.
+ Compactor(final Configuration conf, final Store store) {
this.conf = conf;
+ this.store = store;
+ this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
+ this.compactionCompression = (this.store.getFamily() == null) ?
+ Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
+ }
+
+ /**
+ * TODO: Replace this with {@link CellOutputStream} when StoreFile.Writer uses cells.
+ */
+ public interface CellSink {
+ void append(KeyValue kv) throws IOException;
}
/**
@@ -68,4 +102,137 @@ public abstract class Compactor {
public CompactionProgress getProgress() {
return this.progress;
}
+
+ /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
+ protected static class FileDetails {
+ /** Maximum key count after compaction (for blooms) */
+ public int maxKeyCount = 0;
+ /** Earliest put timestamp if major compaction */
+ public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+ /** The last key in the files we're compacting. */
+ public long maxSeqId = 0;
+ }
+
+ protected FileDetails getFileDetails(
+ Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
+ FileDetails fd = new FileDetails();
+
+ for (StoreFile file : filesToCompact) {
+ long seqNum = file.getMaxSequenceId();
+ fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ LOG.warn("Null reader for " + file.getPath());
+ continue;
+ }
+ // NOTE: getFilterEntries could cause under-sized blooms if the user
+ // switches bloom type (e.g. from ROW to ROWCOL)
+ long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())
+ ? r.getFilterEntries() : r.getEntries();
+ fd.maxKeyCount += keyCount;
+ // If required, calculate the earliest put timestamp of all involved storefiles.
+ // This is used to remove family delete marker during compaction.
+ long earliestPutTs = 0;
+ if (calculatePutTs) {
+ byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+ if (tmp == null) {
+ // There's a file with no information, must be an old one
+ // assume we have very old puts
+ fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+ } else {
+ earliestPutTs = Bytes.toLong(tmp);
+ fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting " + file +
+ ", keycount=" + keyCount +
+ ", bloomtype=" + r.getBloomFilterType().toString() +
+ ", size=" + StringUtils.humanReadableInt(r.length()) +
+ ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
+ ", seqNum=" + seqNum +
+ (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
+ }
+ }
+ return fd;
+ }
+
+ protected List<StoreFileScanner> createFileScanners(
+ final Collection<StoreFile> filesToCompact) throws IOException {
+ return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true);
+ }
+
+ protected long setSmallestReadPoint() {
+ long smallestReadPoint = store.getSmallestReadPoint();
+ MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+ return smallestReadPoint;
+ }
+
+ protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
+ ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
+ if (store.getCoprocessorHost() == null) return null;
+ return store.getCoprocessorHost()
+ .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
+ }
+
+ protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
+ ScanType scanType, InternalScanner scanner) throws IOException {
+ if (store.getCoprocessorHost() == null) return scanner;
+ return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
+ }
+
+ @SuppressWarnings("deprecation")
+ protected boolean performCompaction(InternalScanner scanner,
+ CellSink writer, long smallestReadPoint) throws IOException {
+ int bytesWritten = 0;
+ // Since scanner.next() can return 'false' but still be delivering data,
+ // we have to use a do/while loop.
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+ int closeCheckInterval = HStore.getCloseCheckInterval();
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(kvs, compactionKVMax);
+ // output to writer:
+ for (KeyValue kv : kvs) {
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ kv.setMemstoreTS(0);
+ }
+ writer.append(kv);
+ // update progress per key
+ ++progress.currentCompactedKVs;
+
+ // check periodically to see if a system stop is requested
+ if (closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > closeCheckInterval) {
+ bytesWritten = 0;
+ if (!store.areWritesEnabled()) return false;
+ }
+ }
+ }
+ kvs.clear();
+ } while (hasMore);
+ return true;
+ }
+
+ protected void abortWriter(final StoreFile.Writer writer) throws IOException {
+ writer.close();
+ store.getFileSystem().delete(writer.getPath(), false);
+ }
+
+ /**
+ * @param scanners Store file scanners.
+ * @param scanType Scan type.
+ * @param smallestReadPoint Smallest MVCC read point.
+ * @param earliestPutTs Earliest put across all files.
+ * @return A compaction scanner.
+ */
+ protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
+ ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ return new StoreScanner(store, store.getScanInfo(), scan, scanners,
+ scanType, smallestReadPoint, earliestPutTs);
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1458073&r1=1458072&r2=1458073&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Tue Mar 19 00:54:44 2013
@@ -49,170 +49,62 @@ import org.apache.hadoop.util.StringUtil
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor {
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
- private final Store store;
public DefaultCompactor(final Configuration conf, final Store store) {
- super(conf);
- this.store = store;
+ super(conf, store);
}
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
- @SuppressWarnings("deprecation")
- @Override
public List<Path> compact(final CompactionRequest request) throws IOException {
- final Collection<StoreFile> filesToCompact = request.getFiles();
- boolean majorCompaction = request.isMajor();
- // Max-sequenceID is the last key in the files we're compacting
- long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
+ FileDetails fd = getFileDetails(request.getFiles(), request.isMajor());
+ this.progress = new CompactionProgress(fd.maxKeyCount);
- // Calculate maximum key count after compaction (for blooms)
- // Also calculate earliest put timestamp if major compaction
- int maxKeyCount = 0;
- long earliestPutTs = HConstants.LATEST_TIMESTAMP;
- ScanType scanType = request.isMajor()
- ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
- for (StoreFile file: filesToCompact) {
- StoreFile.Reader r = file.getReader();
- if (r == null) {
- LOG.warn("Null reader for " + file.getPath());
- continue;
- }
- // NOTE: getFilterEntries could cause under-sized blooms if the user
- // switches bloom type (e.g. from ROW to ROWCOL)
- long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())?
- r.getFilterEntries() : r.getEntries();
- maxKeyCount += keyCount;
- // For major compactions calculate the earliest put timestamp of all
- // involved storefiles. This is used to remove family delete marker during
- // compaction.
- if (scanType == ScanType.COMPACT_DROP_DELETES) {
- byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
- if (tmp == null) {
- // There's a file with no information, must be an old one
- // assume we have very old puts
- earliestPutTs = HConstants.OLDEST_TIMESTAMP;
- } else {
- earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compacting " + file +
- ", keycount=" + keyCount +
- ", bloomtype=" + r.getBloomFilterType().toString() +
- ", size=" + StringUtils.humanReadableInt(r.length()) +
- ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
- (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
- }
- }
-
- // keep track of compaction progress
- this.progress = new CompactionProgress(maxKeyCount);
-
- // For each file, obtain a scanner:
- List<StoreFileScanner> scanners = StoreFileScanner
- .getScannersForStoreFiles(filesToCompact, false, false, true);
+ List<StoreFileScanner> scanners = createFileScanners(request.getFiles());
- // Get some configs
- int compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
- Compression.Algorithm compression = store.getFamily().getCompression();
- // Avoid overriding compression setting for major compactions if the user
- // has not specified it separately
- Compression.Algorithm compactionCompression =
- (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
- store.getFamily().getCompactionCompression(): compression;
- // Make the instantiation lazy in case compaction produces no product; i.e.
- // where all source cells are expired or deleted.
StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>();
// Find the smallest read point across all the Scanners.
- long smallestReadPoint = store.getSmallestReadPoint();
- MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+ long smallestReadPoint = setSmallestReadPoint();
try {
InternalScanner scanner = null;
try {
- if (store.getCoprocessorHost() != null) {
- scanner = store.getCoprocessorHost()
- .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
- }
+ /* Include deletes, unless we are doing a major compaction */
+ ScanType scanType =
+ request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
+ scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
- /* Include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
- scanType, smallestReadPoint, earliestPutTs);
- }
- if (store.getCoprocessorHost() != null) {
- InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner,
- scanType, request);
- // NULL scanner returned from coprocessor hooks means skip normal processing
- if (cpScanner == null) {
- return newFiles; // an empty list
- }
- scanner = cpScanner;
+ scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
}
-
- int bytesWritten = 0;
- // Since scanner.next() can return 'false' but still be delivering data,
- // we have to use a do/while loop.
- List<KeyValue> kvs = new ArrayList<KeyValue>();
- // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
- int closeCheckInterval = HStore.getCloseCheckInterval();
- boolean hasMore;
- do {
- hasMore = scanner.next(kvs, compactionKVMax);
- // Create the writer even if no kv(Empty store file is also ok),
- // because we need record the max seq id for the store file, see
- // HBASE-6059
- if (writer == null) {
- writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
- }
- if (writer != null) {
- // output to writer:
- for (KeyValue kv : kvs) {
- if (kv.getMemstoreTS() <= smallestReadPoint) {
- kv.setMemstoreTS(0);
- }
- writer.append(kv);
- // update progress per key
- ++progress.currentCompactedKVs;
-
- // check periodically to see if a system stop is requested
- if (closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > closeCheckInterval) {
- bytesWritten = 0;
- isInterrupted(store, writer);
- }
- }
- }
- }
- kvs.clear();
- } while (hasMore);
- } finally {
- if (scanner != null) {
- scanner.close();
+ scanner = postCreateCoprocScanner(request, scanType, scanner);
+ if (scanner == null) {
+ // NULL scanner returned from coprocessor hooks means skip normal processing.
+ return newFiles;
}
+ // Create the writer even if no kv(Empty store file is also ok),
+ // because we need record the max seq id for the store file, see HBASE-6059
+ writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true);
+ boolean finished = performCompaction(scanner, writer, smallestReadPoint);
+ if (!finished) {
+ abortWriter(writer);
+ writer = null;
+ throw new InterruptedIOException( "Aborting compaction of store " + store +
+ " in region " + store.getRegionInfo().getRegionNameAsString() +
+ " because it was interrupted.");
+ }
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
}
} finally {
if (writer != null) {
- writer.appendMetadata(maxId, majorCompaction);
+ writer.appendMetadata(fd.maxSeqId, request.isMajor());
writer.close();
newFiles.add(writer.getPath());
}
}
return newFiles;
}
-
- void isInterrupted(final Store store, final StoreFile.Writer writer)
- throws IOException {
- if (store.areWritesEnabled()) return;
- // Else cleanup.
- writer.close();
- store.getFileSystem().delete(writer.getPath(), false);
- throw new InterruptedIOException( "Aborting compaction of store " + store +
- " in region " + store.getRegionInfo().getRegionNameAsString() +
- " because it was interrupted.");
- }
}