You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/12/06 06:59:51 UTC
svn commit: r1417740 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/io/hfile/
main/java/org/apache/hadoop/hbase/master/handler/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions...
Author: stack
Date: Thu Dec 6 05:59:46 2012
New Revision: 1417740
URL: http://svn.apache.org/viewvc?rev=1417740&view=rev
Log:
HBASE-7253 Backport Compaction Tool to 0.94; REVERT
Removed:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Dec 6 05:59:46 2012
@@ -645,7 +645,7 @@ public class LruBlockCache implements Bl
// Log size
long totalSize = heapSize();
long freeSize = maxSize - totalSize;
- LruBlockCache.LOG.debug("Stats: " +
+ LruBlockCache.LOG.debug("LRU Stats: " +
"total=" + StringUtils.byteDesc(totalSize) + ", " +
"free=" + StringUtils.byteDesc(freeSize) + ", " +
"max=" + StringUtils.byteDesc(this.maxSize) + ", " +
@@ -653,11 +653,11 @@ public class LruBlockCache implements Bl
"accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " +
"hitRatio=" +
- (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
+ (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) +
"cachingAccesses=" + stats.getRequestCachingCount() + ", " +
"cachingHits=" + stats.getHitCachingCount() + ", " +
"cachingHitsRatio=" +
- (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " +
+ (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) +
"evictions=" + stats.getEvictionCount() + ", " +
"evicted=" + stats.getEvictedCount() + ", " +
"evictedPerRun=" + stats.evictedPerEviction());
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java Thu Dec 6 05:59:46 2012
@@ -142,12 +142,16 @@ public class CreateTableHandler extends
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
final int batchSize =
this.conf.getInt("hbase.master.createtable.batchsize", 100);
+ HLog hlog = null;
for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) {
HRegionInfo newRegion = this.newRegions[regionIdx];
// 1. Create HRegion
HRegion region = HRegion.createHRegion(newRegion,
this.fileSystemManager.getRootDir(), this.conf,
- this.hTableDescriptor, null, false, true);
+ this.hTableDescriptor, hlog);
+ if (hlog == null) {
+ hlog = region.getLog();
+ }
regionInfos.add(region.getRegionInfo());
if (regionIdx % batchSize == 0) {
@@ -159,6 +163,7 @@ public class CreateTableHandler extends
// 3. Close the new region to flush to disk. Close log file too.
region.close();
}
+ hlog.closeAndDelete();
if (regionInfos.size() > 0) {
MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos);
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Dec 6 05:59:46 2012
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
@@ -63,7 +62,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -232,12 +230,12 @@ public class HRegion implements HeapSize
* The directory for the table this region is part of.
* This directory contains the directory for this region.
*/
- private final Path tableDir;
+ final Path tableDir;
- private final HLog log;
- private final FileSystem fs;
- private final Configuration conf;
- private final int rowLockWaitDuration;
+ final HLog log;
+ final FileSystem fs;
+ final Configuration conf;
+ final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
// The internal wait duration to acquire a lock before read/update
@@ -258,8 +256,8 @@ public class HRegion implements HeapSize
// purge timeout, when a RPC call will be terminated by the RPC engine.
final long maxBusyWaitDuration;
- private final HRegionInfo regionInfo;
- private final Path regiondir;
+ final HRegionInfo regionInfo;
+ final Path regiondir;
KeyValue.KVComparator comparator;
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -726,7 +724,7 @@ public class HRegion implements HeapSize
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
- }
+ }
return this.memstoreSize.getAndAdd(memStoreSize);
}
@@ -752,7 +750,7 @@ public class HRegion implements HeapSize
// and then create the file
Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-
+
// if datanode crashes or if the RS goes down just before the close is called while trying to
// close the created regioninfo file in the .tmp directory then on next
// creation we will be getting AlreadyCreatedException.
@@ -760,7 +758,7 @@ public class HRegion implements HeapSize
if (FSUtils.isExists(fs, tmpPath)) {
FSUtils.delete(fs, tmpPath, true);
}
-
+
FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
try {
@@ -777,26 +775,6 @@ public class HRegion implements HeapSize
}
}
- /**
- * @param fs
- * @param dir
- * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
- * @throws IOException
- */
- public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
- throws IOException {
- Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
- if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
- FSDataInputStream in = fs.open(regioninfo);
- try {
- HRegionInfo hri = new HRegionInfo();
- hri.readFields(in);
- return hri;
- } finally {
- in.close();
- }
- }
-
/** @return a HRegionInfo object for this region */
public HRegionInfo getRegionInfo() {
return this.regionInfo;
@@ -1043,16 +1021,19 @@ public class HRegion implements HeapSize
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
}
- static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+ private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
final String threadNamePrefix) {
- return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- public Thread newThread(Runnable r) {
- return new Thread(r, threadNamePrefix + "-" + count++);
- }
- });
+ ThreadPoolExecutor openAndCloseThreadPool = Threads
+ .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+ return t;
+ }
+ });
+ return openAndCloseThreadPool;
}
/**
@@ -1998,7 +1979,7 @@ public class HRegion implements HeapSize
System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
return batchMutate(mutationsAndLocks);
}
-
+
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
@@ -2352,7 +2333,7 @@ public class HRegion implements HeapSize
// do after lock
final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-
+
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
// null will be treated as unknown.
@@ -2655,7 +2636,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -3778,7 +3759,6 @@ public class HRegion implements HeapSize
* @param conf
* @param hTableDescriptor
* @param hlog shared HLog
- * @param boolean initialize - true to initialize the region
* @return new HRegion
*
* @throws IOException
@@ -3786,36 +3766,7 @@ public class HRegion implements HeapSize
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize)
- throws IOException {
- return createHRegion(info, rootDir, conf, hTableDescriptor,
- hlog, initialize, false);
- }
-
- /**
- * Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
- * explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
- *
- * @param info Info for region to create.
- * @param rootDir Root directory for HBase instance
- * @param conf
- * @param hTableDescriptor
- * @param hlog shared HLog
- * @param boolean initialize - true to initialize the region
- * @param boolean ignoreHLog
- - true to skip generate new hlog if it is null, mostly for createTable
- * @return new HRegion
- *
- * @throws IOException
- */
- public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
- final Configuration conf,
- final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final HLog hlog)
throws IOException {
LOG.info("creating HRegion " + info.getTableNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3827,26 +3778,16 @@ public class HRegion implements HeapSize
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
HLog effectiveHLog = hlog;
- if (hlog == null && !ignoreHLog) {
+ if (hlog == null) {
effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
}
HRegion region = HRegion.newHRegion(tableDir,
effectiveHLog, fs, conf, info, hTableDescriptor, null);
- if (initialize) {
- region.initialize();
- }
+ region.initialize();
return region;
}
- public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
- final Configuration conf,
- final HTableDescriptor hTableDescriptor,
- final HLog hlog)
- throws IOException {
- return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
- }
-
/**
* Open a Region.
* @param info Info for region to be opened.
@@ -4358,7 +4299,7 @@ public class HRegion implements HeapSize
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-
+
return results;
}
@@ -4686,10 +4627,10 @@ public class HRegion implements HeapSize
closeRegionOperation();
}
-
+
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4814,7 +4755,7 @@ public class HRegion implements HeapSize
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
}
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -5307,7 +5248,7 @@ public class HRegion implements HeapSize
*/
private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
if (numPutsWithoutWAL.getAndIncrement() == 0) {
- LOG.info("writing data to region " + this +
+ LOG.info("writing data to region " + this +
" with WAL disabled. Data may be lost in the event of a crash.");
}
@@ -5419,11 +5360,11 @@ public class HRegion implements HeapSize
final HLog log = new HLog(fs, logdir, oldLogDir, c);
try {
processTable(fs, tableDir, log, c, majorCompact);
- } finally {
+ } finally {
log.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
- }
+ }
}
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Dec 6 05:59:46 2012
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -131,6 +132,9 @@ public class Store extends SchemaConfigu
private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final String storeNameStr;
+ private CompactionProgress progress;
+ private final int compactionKVMax;
private final boolean verifyBulkLoads;
/* The default priority for user-specified compaction requests.
@@ -154,6 +158,10 @@ public class Store extends SchemaConfigu
new CopyOnWriteArraySet<ChangedReadersObserver>();
private final int blocksize;
+ /** Compression algorithm for flush files and minor compaction */
+ private final Compression.Algorithm compression;
+ /** Compression algorithm for major compaction */
+ private final Compression.Algorithm compactionCompression;
private HFileDataBlockEncoder dataBlockEncoder;
/** Checksum configuration */
@@ -163,8 +171,6 @@ public class Store extends SchemaConfigu
// Comparing KeyValues
final KeyValue.KVComparator comparator;
- private final Compactor compactor;
-
/**
* Constructor
* @param basedir qualified path under which the region directory lives;
@@ -179,16 +185,25 @@ public class Store extends SchemaConfigu
protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration conf)
throws IOException {
- super(conf, region.getRegionInfo().getTableNameAsString(),
+ super(conf, region.getTableDesc().getNameAsString(),
Bytes.toString(family.getName()));
- HRegionInfo info = region.getRegionInfo();
+ HRegionInfo info = region.regionInfo;
this.fs = fs;
- Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
- this.homedir = createStoreHomeDir(this.fs, p);
+ this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+ if (!this.fs.exists(this.homedir)) {
+ if (!this.fs.mkdirs(this.homedir))
+ throw new IOException("Failed create of: " + this.homedir.toString());
+ }
this.region = region;
this.family = family;
this.conf = conf;
this.blocksize = family.getBlocksize();
+ this.compression = family.getCompression();
+ // avoid overriding compression setting for major compactions if the user
+ // has not specified it separately
+ this.compactionCompression =
+ (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
+ family.getCompactionCompression() : this.compression;
this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -213,6 +228,7 @@ public class Store extends SchemaConfigu
"ms in store " + this);
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
+ this.storeNameStr = getColumnFamilyName();
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2,
@@ -229,8 +245,10 @@ public class Store extends SchemaConfigu
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
+ this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
- this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
+ this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
+ false);
if (Store.closeCheckInterval == 0) {
Store.closeCheckInterval = conf.getInt(
@@ -242,47 +260,6 @@ public class Store extends SchemaConfigu
this.checksumType = getChecksumType(conf);
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
- // Create a compaction tool instance
- this.compactor = new Compactor(this.conf);
- }
-
- /**
- * @param family
- * @return
- */
- long getTTL(final HColumnDescriptor family) {
- // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
- long ttl = family.getTimeToLive();
- if (ttl == HConstants.FOREVER) {
- // Default is unlimited ttl.
- ttl = Long.MAX_VALUE;
- } else if (ttl == -1) {
- ttl = Long.MAX_VALUE;
- } else {
- // Second -> ms adjust for user data
- ttl *= 1000;
- }
- return ttl;
- }
-
- /**
- * Create this store's homedir
- * @param fs
- * @param homedir
- * @return Return <code>homedir</code>
- * @throws IOException
- */
- Path createStoreHomeDir(final FileSystem fs,
- final Path homedir) throws IOException {
- if (!fs.exists(homedir)) {
- if (!fs.mkdirs(homedir))
- throw new IOException("Failed create of: " + homedir.toString());
- }
- return homedir;
- }
-
- FileSystem getFileSystem() {
- return this.fs;
}
/**
@@ -343,7 +320,7 @@ public class Store extends SchemaConfigu
* Return the directory in which this store stores its
* StoreFiles
*/
- Path getHomedir() {
+ public Path getHomedir() {
return homedir;
}
@@ -362,10 +339,6 @@ public class Store extends SchemaConfigu
this.dataBlockEncoder = blockEncoder;
}
- FileStatus [] getStoreFiles() throws IOException {
- return FSUtils.listStatus(this.fs, this.homedir, null);
- }
-
/**
* Creates an unsorted list of StoreFile loaded in parallel
* from the given directory.
@@ -373,7 +346,7 @@ public class Store extends SchemaConfigu
*/
private List<StoreFile> loadStoreFiles() throws IOException {
ArrayList<StoreFile> results = new ArrayList<StoreFile>();
- FileStatus files[] = getStoreFiles();
+ FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
if (files == null || files.length == 0) {
return results;
@@ -664,7 +637,7 @@ public class Store extends SchemaConfigu
storeFileCloserThreadPool.shutdownNow();
}
}
- LOG.info("Closed " + this);
+ LOG.debug("closed " + this.storeNameStr);
return result;
} finally {
this.lock.writeLock().unlock();
@@ -750,7 +723,6 @@ public class Store extends SchemaConfigu
scanner = cpScanner;
}
try {
- int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
@@ -764,7 +736,7 @@ public class Store extends SchemaConfigu
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
- hasMore = scanner.next(kvs, compactionKVMax);
+ hasMore = scanner.next(kvs, this.compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
@@ -856,7 +828,7 @@ public class Store extends SchemaConfigu
*/
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException {
- return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
+ return createWriterInTmp(maxKeyCount, this.compression, false);
}
/*
@@ -1009,12 +981,16 @@ public class Store extends SchemaConfigu
* @param cr
* compaction details obtained from requestCompaction()
* @throws IOException
- * @return Storefile we compacted into or null if we failed or opted out early.
*/
- StoreFile compact(CompactionRequest cr) throws IOException {
- if (cr == null || cr.getFiles().isEmpty()) return null;
- Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
+ void compact(CompactionRequest cr) throws IOException {
+ if (cr == null || cr.getFiles().isEmpty()) {
+ return;
+ }
+ Preconditions.checkArgument(cr.getStore().toString()
+ .equals(this.toString()));
+
List<StoreFile> filesToCompact = cr.getFiles();
+
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
@@ -1026,26 +1002,19 @@ public class Store extends SchemaConfigu
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this + " of "
+ + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
try {
- StoreFile.Writer writer =
- this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
+ StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
+ maxId);
// Move the compaction into place.
- if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
- sf = completeCompaction(filesToCompact, writer);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
- }
- } else {
- // Create storefile around what we wrote with a reader on it.
- sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- sf.createReader();
+ sf = completeCompaction(filesToCompact, writer);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postCompact(this, sf);
}
} finally {
synchronized (filesCompacting) {
@@ -1054,7 +1023,7 @@ public class Store extends SchemaConfigu
}
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this + " of "
+ + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into " +
(sf == null ? "none" : sf.getPath().getName()) +
@@ -1062,7 +1031,6 @@ public class Store extends SchemaConfigu
StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is "
+ StringUtils.humanReadableInt(storeSize));
- return sf;
}
/**
@@ -1102,8 +1070,7 @@ public class Store extends SchemaConfigu
try {
// Ready to go. Have list of files to compact.
- StoreFile.Writer writer =
- this.compactor.compact(this, filesToCompact, isMajor, maxId);
+ StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
@@ -1152,10 +1119,10 @@ public class Store extends SchemaConfigu
}
/** getter for CompactionProgress object
- * @return CompactionProgress object; can be null
+ * @return CompactionProgress object
*/
public CompactionProgress getCompactionProgress() {
- return this.compactor.getProgress();
+ return this.progress;
}
/*
@@ -1207,19 +1174,19 @@ public class Store extends SchemaConfigu
if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this +
+ LOG.debug("Skipping major compaction of " + this.storeNameStr +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl);
}
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
- LOG.debug("Major compaction triggered on store " + this +
+ LOG.debug("Major compaction triggered on store " + this.storeNameStr +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this +
+ LOG.debug("Major compaction triggered on store " + this.storeNameStr +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
@@ -1409,12 +1376,12 @@ public class Store extends SchemaConfigu
compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
- if (pos != 0) compactSelection.clearSubList(0, pos);
+ compactSelection.clearSubList(0, pos);
}
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this + ": no store files to compact");
+ this.storeNameStr + ": no store files to compact");
compactSelection.emptyFileList();
return compactSelection;
}
@@ -1501,7 +1468,7 @@ public class Store extends SchemaConfigu
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipped compaction of " + this
+ LOG.debug("Skipped compaction of " + this.storeNameStr
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
@@ -1528,6 +1495,149 @@ public class Store extends SchemaConfigu
}
/**
+ * Do a minor/major compaction on an explicit set of storefiles in a Store.
+ * Uses the scan infrastructure to make it easy.
+ *
+ * @param filesToCompact which files to compact
+ * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+ * @param maxId Readers maximum sequence id.
+ * @return Product of compaction or null if all cells expired or deleted and
+ * nothing made it through the compaction.
+ * @throws IOException
+ */
+ StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
+ final boolean majorCompaction, final long maxId)
+ throws IOException {
+ // calculate maximum key count after compaction (for blooms)
+ int maxKeyCount = 0;
+ long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+ for (StoreFile file : filesToCompact) {
+ StoreFile.Reader r = file.getReader();
+ if (r != null) {
+ // NOTE: getFilterEntries could cause under-sized blooms if the user
+ // switches bloom type (e.g. from ROW to ROWCOL)
+ long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
+ ? r.getFilterEntries() : r.getEntries();
+ maxKeyCount += keyCount;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting " + file +
+ ", keycount=" + keyCount +
+ ", bloomtype=" + r.getBloomFilterType().toString() +
+ ", size=" + StringUtils.humanReadableInt(r.length()) +
+ ", encoding=" + r.getHFileReader().getEncodingOnDisk());
+ }
+ }
+ // For major compactions calculate the earliest put timestamp
+ // of all involved storefiles. This is used to remove
+ // family delete marker during the compaction.
+ if (majorCompaction) {
+ byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+ if (tmp == null) {
+ // there's a file with no information, must be an old one
+ // assume we have very old puts
+ earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+ } else {
+ earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+ }
+ }
+ }
+
+ // keep track of compaction progress
+ progress = new CompactionProgress(maxKeyCount);
+
+ // For each file, obtain a scanner:
+ List<StoreFileScanner> scanners = StoreFileScanner
+ .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+ // Make the instantiation lazy in case compaction produces no product; i.e.
+ // where all source cells are expired or deleted.
+ StoreFile.Writer writer = null;
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = region.getSmallestReadPoint();
+ MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+ try {
+ InternalScanner scanner = null;
+ try {
+ if (getHRegion().getCoprocessorHost() != null) {
+ scanner = getHRegion()
+ .getCoprocessorHost()
+ .preCompactScannerOpen(this, scanners,
+ majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+ }
+ if (scanner == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(getFamily().getMaxVersions());
+ /* Include deletes, unless we are doing a major compaction */
+ scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
+ majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+ smallestReadPoint, earliestPutTs);
+ }
+ if (getHRegion().getCoprocessorHost() != null) {
+ InternalScanner cpScanner =
+ getHRegion().getCoprocessorHost().preCompact(this, scanner);
+ // NULL scanner returned from coprocessor hooks means skip normal processing
+ if (cpScanner == null) {
+ return null;
+ }
+ scanner = cpScanner;
+ }
+
+ int bytesWritten = 0;
+ // since scanner.next() can return 'false' but still be delivering data,
+ // we have to use a do/while loop.
+ ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+ // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(kvs, this.compactionKVMax);
+ if (writer == null && !kvs.isEmpty()) {
+ writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
+ true);
+ }
+ if (writer != null) {
+ // output to writer:
+ for (KeyValue kv : kvs) {
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ kv.setMemstoreTS(0);
+ }
+ writer.append(kv);
+ // update progress per key
+ ++progress.currentCompactedKVs;
+
+ // check periodically to see if a system stop is requested
+ if (Store.closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > Store.closeCheckInterval) {
+ bytesWritten = 0;
+ if (!this.region.areWritesEnabled()) {
+ writer.close();
+ fs.delete(writer.getPath(), false);
+ throw new InterruptedIOException(
+ "Aborting compaction of store " + this +
+ " in region " + this.region +
+ " because user requested stop.");
+ }
+ }
+ }
+ }
+ }
+ kvs.clear();
+ } while (hasMore);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ } finally {
+ if (writer != null) {
+ writer.appendMetadata(maxId, majorCompaction);
+ writer.close();
+ }
+ }
+ return writer;
+ }
+
+ /**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
*
@@ -1631,7 +1741,7 @@ public class Store extends SchemaConfigu
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files in " + this +
+ LOG.error("Failed replacing compacted files in " + this.storeNameStr +
". Compacted file is " + (result == null? "none": result.toString()) +
". Files replaced " + compactedFiles.toString() +
" some of which may have been already removed", e);
@@ -1917,7 +2027,7 @@ public class Store extends SchemaConfigu
return mk.getRow();
}
} catch(IOException e) {
- LOG.warn("Failed getting store size for " + this, e);
+ LOG.warn("Failed getting store size for " + this.storeNameStr, e);
} finally {
this.lock.readLock().unlock();
}
@@ -1970,7 +2080,7 @@ public class Store extends SchemaConfigu
@Override
public String toString() {
- return getColumnFamilyName();
+ return this.storeNameStr;
}
/**
@@ -2086,7 +2196,7 @@ public class Store extends SchemaConfigu
}
HRegionInfo getHRegionInfo() {
- return this.region.getRegionInfo();
+ return this.region.regionInfo;
}
/**
@@ -2214,8 +2324,8 @@ public class Store extends SchemaConfigu
public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
- + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Thu Dec 6 05:59:46 2012
@@ -49,4 +49,5 @@ public class CompactionProgress {
public float getProgressPct() {
return currentCompactedKVs / totalCompactingKVs;
}
+
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java Thu Dec 6 05:59:46 2012
@@ -25,6 +25,9 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ChecksumFactory;
+
/**
* Checksum types. The Checksum type is a one byte number
* that stores a representation of the checksum algorithm
@@ -67,7 +70,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32);
LOG.info("Checksum using " + PURECRC32);
} catch (Exception e) {
- LOG.trace(PURECRC32 + " not available.");
+ LOG.info(PURECRC32 + " not available.");
}
try {
// The default checksum class name is java.util.zip.CRC32.
@@ -77,7 +80,7 @@ public enum ChecksumType {
LOG.info("Checksum can use " + JDKCRC);
}
} catch (Exception e) {
- LOG.trace(JDKCRC + " not available.");
+ LOG.warn(JDKCRC + " not available. ", e);
}
}
@@ -110,7 +113,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32C);
LOG.info("Checksum can use " + PURECRC32C);
} catch (Exception e) {
- LOG.trace(PURECRC32C + " not available.");
+ LOG.info(PURECRC32C + " not available. ");
}
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Thu Dec 6 05:59:46 2012
@@ -151,7 +151,7 @@ public abstract class FSUtils {
*/
public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException {
- LOG.debug("Creating file=" + path + " with permission=" + perm);
+ LOG.debug("Creating file:" + path + "with permission:" + perm);
return fs.create(path, perm, overwrite,
fs.getConf().getInt("io.file.buffer.size", 4096),
@@ -1013,25 +1013,6 @@ public abstract class FSUtils {
}
/**
- * Given a particular region dir, return all the familydirs inside it
- *
- * @param fs A file system for the Path
- * @param regionDir Path to a specific region directory
- * @return List of paths to valid family directories in region dir.
- * @throws IOException
- */
- public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
- // assumes we are in a region dir.
- FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
- List<Path> familyDirs = new ArrayList<Path>(fds.length);
- for (FileStatus fdfs: fds) {
- Path fdPath = fdfs.getPath();
- familyDirs.add(fdPath);
- }
- return familyDirs;
- }
-
- /**
* Filter for HFiles that excludes reference files.
*/
public static class HFileFilter implements PathFilter {
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1417740&r1=1417739&r2=1417740&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Thu Dec 6 05:59:46 2012
@@ -587,10 +587,8 @@ public class TestCompaction extends HBas
List<StoreFile> storeFiles = store.getStorefiles();
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
- Compactor tool = new Compactor(this.conf);
- StoreFile.Writer compactedFile =
- tool.compact(store, storeFiles, false, maxId);
+ StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId);
// Now lets corrupt the compacted file.
FileSystem fs = FileSystem.get(conf);