You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/08/17 18:46:09 UTC
svn commit: r1374354 [4/5] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/mapreduce/
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionserver/ main/ja...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1374354&r1=1374353&r2=1374354&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Aug 17 16:46:07 2012
@@ -1,6 +1,4 @@
/**
- * Copyright 2010 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,2191 +18,270 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
-import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
-import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ChecksumType;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
+
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
/**
- * A Store holds a column family in a Region. Its a memstore and a set of zero
- * or more StoreFiles, which stretch backwards over time.
- *
- * <p>There's no reason to consider append-logging at this level; all logging
- * and locking is handled at the HRegion level. Store just provides
- * services to manage sets of StoreFiles. One of the most important of those
- * services is compaction services where files are aggregated once they pass
- * a configurable threshold.
- *
- * <p>The only thing having to do with logs that Store needs to deal with is
- * the reconstructionLog. This is a segment of an HRegion's log that might
- * NOT be present upon startup. If the param is NULL, there's nothing to do.
- * If the param is non-NULL, we need to process the log to reconstruct
- * a TreeMap that might not have been written to disk before the process
- * died.
- *
- * <p>It's assumed that after this constructor returns, the reconstructionLog
- * file will be deleted (by whoever has instantiated the Store).
- *
- * <p>Locking and transactions are handled at a higher level. This API should
- * not be called directly but by an HRegion manager.
+ * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or
+ * more StoreFiles, which stretch backwards over time.
*/
@InterfaceAudience.Private
-public class Store extends SchemaConfigured implements HStore {
- static final Log LOG = LogFactory.getLog(Store.class);
+@InterfaceStability.Evolving
+public interface Store extends SchemaAware, HeapSize {
+
+ /* The default priority for user-specified compaction requests.
+ * The user gets top priority unless we have blocking compactions. (Pri <= 0)
+ */
+ public static final int PRIORITY_USER = 1;
+ public static final int NO_PRIORITY = Integer.MIN_VALUE;
+
+ // General Accessors
+ public KeyValue.KVComparator getComparator();
+
+ public List<StoreFile> getStorefiles();
+
+ /**
+ * Close all the readers We don't need to worry about subsequent requests because the HRegion
+ * holds a write lock that will prevent any more reads or writes.
+ * @return the {@link StoreFile StoreFiles} that were previously being used.
+ * @throws IOException on failure
+ */
+ public ImmutableList<StoreFile> close() throws IOException;
+
+ /**
+ * Return a scanner for both the memstore and the HStore files. Assumes we are not in a
+ * compaction.
+ * @param scan Scan to apply when scanning the stores
+ * @param targetCols columns to scan
+ * @return a scanner over the current key values
+ * @throws IOException on failure
+ */
+ public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols)
+ throws IOException;
- protected final MemStore memstore;
- // This stores directory in the filesystem.
- private final Path homedir;
- private final HRegion region;
- private final HColumnDescriptor family;
- final FileSystem fs;
- final Configuration conf;
- final CacheConfig cacheConf;
- // ttl in milliseconds.
- private long ttl;
- private final int minFilesToCompact;
- private final int maxFilesToCompact;
- private final long minCompactSize;
- private final long maxCompactSize;
- private long lastCompactSize = 0;
- volatile boolean forceMajor = false;
- /* how many bytes to write between status checks */
- static int closeCheckInterval = 0;
- private final int blockingStoreFileCount;
- private volatile long storeSize = 0L;
- private volatile long totalUncompressedBytes = 0L;
- private final Object flushLock = new Object();
- final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final boolean verifyBulkLoads;
-
- // not private for testing
- /* package */ScanInfo scanInfo;
- /*
- * List of store files inside this store. This is an immutable list that
- * is atomically replaced when its contents change.
- */
- private ImmutableList<StoreFile> storefiles = null;
-
- List<StoreFile> filesCompacting = Lists.newArrayList();
-
- // All access must be synchronized.
- private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
- new CopyOnWriteArraySet<ChangedReadersObserver>();
-
- private final int blocksize;
- private HFileDataBlockEncoder dataBlockEncoder;
-
- /** Checksum configuration */
- private ChecksumType checksumType;
- private int bytesPerChecksum;
-
- // Comparing KeyValues
- final KeyValue.KVComparator comparator;
-
- private final Compactor compactor;
-
- /**
- * Constructor
- * @param basedir qualified path under which the region directory lives;
- * generally the table subdirectory
- * @param region
- * @param family HColumnDescriptor for this column
- * @param fs file system object
- * @param confParam configuration object
- * failed. Can be null.
+ /**
+ * Updates the value for the given row/family/qualifier. This function will always be seen as
+ * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
+ * control necessary.
+ * @param row row to update
+ * @param f family to update
+ * @param qualifier qualifier to update
+ * @param newValue the new value to set into memstore
+ * @return memstore size delta
* @throws IOException
*/
- protected Store(Path basedir, HRegion region, HColumnDescriptor family,
- FileSystem fs, Configuration confParam)
- throws IOException {
- super(new CompoundConfiguration().add(confParam).add(
- family.getValues()), region.getRegionInfo().getTableNameAsString(),
- Bytes.toString(family.getName()));
- HRegionInfo info = region.getRegionInfo();
- this.fs = fs;
- // Assemble the store's home directory.
- Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
- // Ensure it exists.
- this.homedir = createStoreHomeDir(this.fs, p);
- this.region = region;
- this.family = family;
- // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
- this.conf = new CompoundConfiguration()
- .add(confParam)
- .add(family.getValues());
- this.blocksize = family.getBlocksize();
-
- this.dataBlockEncoder =
- new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
- family.getDataBlockEncoding());
-
- this.comparator = info.getComparator();
- // Get TTL
- this.ttl = getTTL(family);
- // used by ScanQueryMatcher
- long timeToPurgeDeletes =
- Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
- LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
- "ms in store " + this);
- // Why not just pass a HColumnDescriptor in here altogether? Even if have
- // to clone it?
- scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
- this.memstore = new MemStore(conf, this.comparator);
-
- // By default, compact if storefile.count >= minFilesToCompact
- this.minFilesToCompact = Math.max(2,
- conf.getInt("hbase.hstore.compaction.min",
- /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
- LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
-
- // Setting up cache configuration for this family
- this.cacheConf = new CacheConfig(conf, family);
- this.blockingStoreFileCount =
- conf.getInt("hbase.hstore.blockingStoreFiles", 7);
-
- this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
- this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
- this.region.memstoreFlushSize);
- this.maxCompactSize
- = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
-
- this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
-
- if (Store.closeCheckInterval == 0) {
- Store.closeCheckInterval = conf.getInt(
- "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
- }
- this.storefiles = sortAndClone(loadStoreFiles());
-
- // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
- this.checksumType = getChecksumType(conf);
- // initilize bytes per checksum
- this.bytesPerChecksum = getBytesPerChecksum(conf);
- // Create a compaction tool instance
- this.compactor = new Compactor(this.conf);
- }
-
- /**
- * @param family
- * @return
- */
- long getTTL(final HColumnDescriptor family) {
- // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
- long ttl = family.getTimeToLive();
- if (ttl == HConstants.FOREVER) {
- // Default is unlimited ttl.
- ttl = Long.MAX_VALUE;
- } else if (ttl == -1) {
- ttl = Long.MAX_VALUE;
- } else {
- // Second -> ms adjust for user data
- ttl *= 1000;
- }
- return ttl;
- }
-
- /**
- * Create this store's homedir
- * @param fs
- * @param homedir
- * @return Return <code>homedir</code>
+ public long updateColumnValue(byte[] row, byte[] f, byte[] qualifier, long newValue)
+ throws IOException;
+
+ /**
+ * Adds or replaces the specified KeyValues.
+ * <p>
+ * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
+ * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
+ * <p>
+ * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
+ * across all of them.
+ * @param kvs
+ * @return memstore size delta
* @throws IOException
*/
- Path createStoreHomeDir(final FileSystem fs,
- final Path homedir) throws IOException {
- if (!fs.exists(homedir)) {
- if (!fs.mkdirs(homedir))
- throw new IOException("Failed create of: " + homedir.toString());
- }
- return homedir;
- }
+ public long upsert(Iterable<KeyValue> kvs) throws IOException;
- FileSystem getFileSystem() {
- return this.fs;
- }
+ /**
+ * Adds a value to the memstore
+ * @param kv
+ * @return memstore size delta
+ */
+ public long add(KeyValue kv);
/**
- * Returns the configured bytesPerChecksum value.
- * @param conf The configuration
- * @return The bytesPerChecksum that is set in the configuration
+ * Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the
+ * key & memstoreTS value of the kv parameter.
+ * @param kv
*/
- public static int getBytesPerChecksum(Configuration conf) {
- return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
- HFile.DEFAULT_BYTES_PER_CHECKSUM);
- }
+ public void rollback(final KeyValue kv);
/**
- * Returns the configured checksum algorithm.
- * @param conf The configuration
- * @return The checksum algorithm that is set in the configuration
+ * Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. WARNING:
+ * Only use this method on a table where writes occur with strictly increasing timestamps. This
+ * method assumes this pattern of writes in order to make it reasonably performant. Also our
+ * search is dependent on the axiom that deletes are for cells that are in the container that
+ * follows whether a memstore snapshot or a storefile, not for the current container: i.e. we'll
+ * see deletes before we come across cells we are to delete. Presumption is that the
+ * memstore#kvset is processed before memstore#snapshot and so on.
+ * @param row The row key of the targeted row.
+ * @return Found keyvalue or null if none found.
+ * @throws IOException
*/
- public static ChecksumType getChecksumType(Configuration conf) {
- String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
- if (checksumName == null) {
- return HFile.DEFAULT_CHECKSUM_TYPE;
- } else {
- return ChecksumType.nameToType(checksumName);
- }
- }
+ public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException;
- public HColumnDescriptor getFamily() {
- return this.family;
- }
+ // Compaction oriented methods
+
+ public boolean throttleCompaction(long compactionSize);
/**
- * @return The maximum sequence id in all store files.
+ * getter for CompactionProgress object
+ * @return CompactionProgress object; can be null
*/
- long getMaxSequenceId() {
- return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
- }
+ public CompactionProgress getCompactionProgress();
+
+ public CompactionRequest requestCompaction() throws IOException;
+
+ public CompactionRequest requestCompaction(int priority) throws IOException;
- @Override
- public long getMaxMemstoreTS() {
- return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
- }
+ public void finishRequest(CompactionRequest cr);
/**
- * @param tabledir
- * @param encodedName Encoded region name.
- * @param family
- * @return Path to family/Store home directory.
+ * @return true if we should run a major compaction.
*/
- public static Path getStoreHomedir(final Path tabledir,
- final String encodedName, final byte [] family) {
- return new Path(tabledir, new Path(encodedName,
- new Path(Bytes.toString(family))));
- }
+ public boolean isMajorCompaction() throws IOException;
+
+ public void triggerMajorCompaction();
/**
- * Return the directory in which this store stores its
- * StoreFiles
+ * See if there's too much store files in this store
+ * @return true if number of store files is greater than the number defined in minFilesToCompact
*/
- Path getHomedir() {
- return homedir;
- }
+ public boolean needsCompaction();
- @Override
- public HFileDataBlockEncoder getDataBlockEncoder() {
- return dataBlockEncoder;
- }
+ public int getCompactPriority();
/**
- * Should be used only in tests.
- * @param blockEncoder the block delta encoder to use
+ * @param priority priority to check against. When priority is {@link Store#PRIORITY_USER},
+ * {@link Store#PRIORITY_USER} is returned.
+ * @return The priority that this store has in the compaction queue.
*/
- void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
- this.dataBlockEncoder = blockEncoder;
- }
+ public int getCompactPriority(int priority);
+
+ public StoreFlusher getStoreFlusher(long cacheFlushId);
- FileStatus[] getStoreFiles() throws IOException {
- return FSUtils.listStatus(this.fs, this.homedir, null);
- }
+ // Split oriented methods
+
+ public boolean canSplit();
/**
- * Creates an unsorted list of StoreFile loaded in parallel
- * from the given directory.
- * @throws IOException
+ * Determines if Store should be split
+ * @return byte[] if store should be split, null otherwise.
*/
- private List<StoreFile> loadStoreFiles() throws IOException {
- ArrayList<StoreFile> results = new ArrayList<StoreFile>();
- FileStatus files[] = getStoreFiles();
-
- if (files == null || files.length == 0) {
- return results;
- }
- // initialize the thread pool for opening store files in parallel..
- ThreadPoolExecutor storeFileOpenerThreadPool =
- this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
- this.family.getNameAsString());
- CompletionService<StoreFile> completionService =
- new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
-
- int totalValidStoreFile = 0;
- for (int i = 0; i < files.length; i++) {
- // Skip directories.
- if (files[i].isDir()) {
- continue;
- }
- final Path p = files[i].getPath();
- // Check for empty file. Should never be the case but can happen
- // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
- if (this.fs.getFileStatus(p).getLen() <= 0) {
- LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
- continue;
- }
-
- // open each store file in parallel
- completionService.submit(new Callable<StoreFile>() {
- public StoreFile call() throws IOException {
- StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
- family.getBloomFilterType(), dataBlockEncoder);
- passSchemaMetricsTo(storeFile);
- storeFile.createReader();
- return storeFile;
- }
- });
- totalValidStoreFile++;
- }
-
- try {
- for (int i = 0; i < totalValidStoreFile; i++) {
- Future<StoreFile> future = completionService.take();
- StoreFile storeFile = future.get();
- long length = storeFile.getReader().length();
- this.storeSize += length;
- this.totalUncompressedBytes +=
- storeFile.getReader().getTotalUncompressedBytes();
- if (LOG.isDebugEnabled()) {
- LOG.debug("loaded " + storeFile.toStringDetailed());
- }
- results.add(storeFile);
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (ExecutionException e) {
- throw new IOException(e.getCause());
- } finally {
- storeFileOpenerThreadPool.shutdownNow();
- }
-
- return results;
- }
-
- @Override
- public long add(final KeyValue kv) {
- lock.readLock().lock();
- try {
- return this.memstore.add(kv);
- } finally {
- lock.readLock().unlock();
- }
- }
+ public byte[] getSplitPoint();
+
+ // Bulk Load methods
/**
- * Adds a value to the memstore
- *
- * @param kv
- * @return memstore size delta
+ * This throws a WrongRegionException if the HFile does not fit in this region, or an
+ * InvalidHFileException if the HFile is not valid.
*/
- protected long delete(final KeyValue kv) {
- lock.readLock().lock();
- try {
- return this.memstore.delete(kv);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public void rollback(final KeyValue kv) {
- lock.readLock().lock();
- try {
- this.memstore.rollback(kv);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- /**
- * @return All store files.
- */
- @Override
- public List<StoreFile> getStorefiles() {
- return this.storefiles;
- }
-
- @Override
- public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
- HFile.Reader reader = null;
- try {
- LOG.info("Validating hfile at " + srcPath + " for inclusion in "
- + "store " + this + " region " + this.region);
- reader = HFile.createReader(srcPath.getFileSystem(conf),
- srcPath, cacheConf);
- reader.loadFileInfo();
-
- byte[] firstKey = reader.getFirstRowKey();
- Preconditions.checkState(firstKey != null, "First key can not be null");
- byte[] lk = reader.getLastKey();
- Preconditions.checkState(lk != null, "Last key can not be null");
- byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
-
- LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
- " last=" + Bytes.toStringBinary(lastKey));
- LOG.debug("Region bounds: first=" +
- Bytes.toStringBinary(region.getStartKey()) +
- " last=" + Bytes.toStringBinary(region.getEndKey()));
-
- HRegionInfo hri = region.getRegionInfo();
- if (!hri.containsRange(firstKey, lastKey)) {
- throw new WrongRegionException(
- "Bulk load file " + srcPath.toString() + " does not fit inside region "
- + this.region);
- }
-
- if (verifyBulkLoads) {
- KeyValue prevKV = null;
- HFileScanner scanner = reader.getScanner(false, false, false);
- scanner.seekTo();
- do {
- KeyValue kv = scanner.getKeyValue();
- if (prevKV != null) {
- if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(),
- prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(),
- kv.getRowLength()) > 0) {
- throw new InvalidHFileException("Previous row is greater than"
- + " current row: path=" + srcPath + " previous="
- + Bytes.toStringBinary(prevKV.getKey()) + " current="
- + Bytes.toStringBinary(kv.getKey()));
- }
- if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(),
- prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(),
- kv.getFamilyLength()) != 0) {
- throw new InvalidHFileException("Previous key had different"
- + " family compared to current key: path=" + srcPath
- + " previous=" + Bytes.toStringBinary(prevKV.getFamily())
- + " current=" + Bytes.toStringBinary(kv.getFamily()));
- }
- }
- prevKV = kv;
- } while (scanner.next());
- }
- } finally {
- if (reader != null) reader.close();
- }
- }
-
- @Override
- public void bulkLoadHFile(String srcPathStr) throws IOException {
- Path srcPath = new Path(srcPathStr);
-
- // Copy the file if it's on another filesystem
- FileSystem srcFs = srcPath.getFileSystem(conf);
- FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
- if (!srcFs.equals(desFs)) {
- LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
- "the destination store. Copying file over to destination filesystem.");
- Path tmpPath = getTmpPath();
- FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
- LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
- srcPath = tmpPath;
- }
-
- Path dstPath = StoreFile.getRandomFilename(fs, homedir);
- LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
- StoreFile.rename(fs, srcPath, dstPath);
-
- StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- passSchemaMetricsTo(sf);
-
- StoreFile.Reader r = sf.createReader();
- this.storeSize += r.length();
- this.totalUncompressedBytes += r.getTotalUncompressedBytes();
-
- LOG.info("Moved HFile " + srcPath + " into store directory " +
- homedir + " - updating store file list.");
-
- // Append the new storefile into the list
- this.lock.writeLock().lock();
- try {
- ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
- newFiles.add(sf);
- this.storefiles = sortAndClone(newFiles);
- } finally {
- // We need the lock, as long as we are updating the storefiles
- // or changing the memstore. Let us release it before calling
- // notifyChangeReadersObservers. See HBASE-4485 for a possible
- // deadlock scenario that could have happened if continue to hold
- // the lock.
- this.lock.writeLock().unlock();
- }
- notifyChangedReadersObservers();
- LOG.info("Successfully loaded store file " + srcPath
- + " into store " + this + " (new location: " + dstPath + ")");
- }
-
- /**
- * Get a temporary path in this region. These temporary files
- * will get cleaned up when the region is re-opened if they are
- * still around.
- */
- private Path getTmpPath() throws IOException {
- return StoreFile.getRandomFilename(
- fs, region.getTmpDir());
- }
-
- @Override
- public ImmutableList<StoreFile> close() throws IOException {
- this.lock.writeLock().lock();
- try {
- ImmutableList<StoreFile> result = storefiles;
-
- // Clear so metrics doesn't find them.
- storefiles = ImmutableList.of();
-
- if (!result.isEmpty()) {
- // initialize the thread pool for closing store files in parallel.
- ThreadPoolExecutor storeFileCloserThreadPool = this.region
- .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
- + this.family.getNameAsString());
-
- // close each store file in parallel
- CompletionService<Void> completionService =
- new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
- for (final StoreFile f : result) {
- completionService.submit(new Callable<Void>() {
- public Void call() throws IOException {
- f.closeReader(true);
- return null;
- }
- });
- }
-
- try {
- for (int i = 0; i < result.size(); i++) {
- Future<Void> future = completionService.take();
- future.get();
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (ExecutionException e) {
- throw new IOException(e.getCause());
- } finally {
- storeFileCloserThreadPool.shutdownNow();
- }
- }
- LOG.info("Closed " + this);
- return result;
- } finally {
- this.lock.writeLock().unlock();
- }
- }
-
- /**
- * Snapshot this stores memstore. Call before running
- * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} so it has
- * some work to do.
- */
- void snapshot() {
- this.memstore.snapshot();
- }
-
- /**
- * Write out current snapshot. Presumes {@link #snapshot()} has been called
- * previously.
- * @param logCacheFlushId flush sequence number
- * @param snapshot
- * @param snapshotTimeRangeTracker
- * @param flushedSize The number of bytes flushed
- * @param status
- * @return Path The path name of the tmp file to which the store was flushed
- * @throws IOException
- */
- private Path flushCache(final long logCacheFlushId,
- SortedSet<KeyValue> snapshot,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
- MonitoredTask status) throws IOException {
- // If an exception happens flushing, we let it out without clearing
- // the memstore snapshot. The old snapshot will be returned when we say
- // 'snapshot', the next time flush comes around.
- return internalFlushCache(
- snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
- }
-
- /*
- * @param cache
- * @param logCacheFlushId
- * @param snapshotTimeRangeTracker
- * @param flushedSize The number of bytes flushed
- * @return Path The path name of the tmp file to which the store was flushed
- * @throws IOException
+ public void assertBulkLoadHFileOk(Path srcPath) throws IOException;
+
+ /**
+ * This method should only be called from HRegion. It is assumed that the ranges of values in the
+ * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
*/
- private Path internalFlushCache(final SortedSet<KeyValue> set,
- final long logCacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
- MonitoredTask status)
- throws IOException {
- StoreFile.Writer writer;
- // Find the smallest read point across all the Scanners.
- long smallestReadPoint = region.getSmallestReadPoint();
- long flushed = 0;
- Path pathName;
- // Don't flush if there are no entries.
- if (set.size() == 0) {
- return null;
- }
- // Use a store scanner to find which rows to flush.
- // Note that we need to retain deletes, hence
- // treat this as a minor compaction.
- InternalScanner scanner = null;
- KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
- if (getHRegion().getCoprocessorHost() != null) {
- scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
- }
- if (scanner == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(scanInfo.getMaxVersions());
- scanner = new StoreScanner(this, scanInfo, scan,
- Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT,
- this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
- }
- if (getHRegion().getCoprocessorHost() != null) {
- InternalScanner cpScanner =
- getHRegion().getCoprocessorHost().preFlush(this, scanner);
- // NULL scanner returned from coprocessor hooks means skip normal processing
- if (cpScanner == null) {
- return null;
- }
- scanner = cpScanner;
- }
- try {
- int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
- // TODO: We can fail in the below block before we complete adding this
- // flush to list of store files. Add cleanup of anything put on filesystem
- // if we fail.
- synchronized (flushLock) {
- status.setStatus("Flushing " + this + ": creating writer");
- // A. Write the map out to the disk
- writer = createWriterInTmp(set.size());
- writer.setTimeRangeTracker(snapshotTimeRangeTracker);
- pathName = writer.getPath();
- try {
- List<KeyValue> kvs = new ArrayList<KeyValue>();
- boolean hasMore;
- do {
- hasMore = scanner.next(kvs, compactionKVMax);
- if (!kvs.isEmpty()) {
- for (KeyValue kv : kvs) {
- // If we know that this KV is going to be included always, then let us
- // set its memstoreTS to 0. This will help us save space when writing to disk.
- if (kv.getMemstoreTS() <= smallestReadPoint) {
- // let us not change the original KV. It could be in the memstore
- // changing its memstoreTS could affect other threads/scanners.
- kv = kv.shallowCopy();
- kv.setMemstoreTS(0);
- }
- writer.append(kv);
- flushed += this.memstore.heapSizeChange(kv, true);
- }
- kvs.clear();
- }
- } while (hasMore);
- } finally {
- // Write out the log sequence number that corresponds to this output
- // hfile. The hfile is current up to and including logCacheFlushId.
- status.setStatus("Flushing " + this + ": appending metadata");
- writer.appendMetadata(logCacheFlushId, false);
- status.setStatus("Flushing " + this + ": closing flushed file");
- writer.close();
- }
- }
- } finally {
- flushedSize.set(flushed);
- scanner.close();
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("Flushed " +
- ", sequenceid=" + logCacheFlushId +
- ", memsize=" + StringUtils.humanReadableInt(flushed) +
- ", into tmp file " + pathName);
- }
- return pathName;
- }
-
- /*
- * @param path The pathname of the tmp file into which the store was flushed
- * @param logCacheFlushId
- * @return StoreFile created.
- * @throws IOException
+ public void bulkLoadHFile(String srcPathStr) throws IOException;
+
+ // General accessors into the state of the store
+ // TODO abstract some of this out into a metrics class
+
+ /**
+ * @return <tt>true</tt> if the store has any underlying reference files to older HFiles
*/
- private StoreFile commitFile(final Path path,
- final long logCacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
- MonitoredTask status)
- throws IOException {
- // Write-out finished successfully, move into the right spot
- String fileName = path.getName();
- Path dstPath = new Path(homedir, fileName);
- validateStoreFile(path);
- String msg = "Renaming flushed file at " + path + " to " + dstPath;
- LOG.debug(msg);
- status.setStatus("Flushing " + this + ": " + msg);
- if (!fs.rename(path, dstPath)) {
- LOG.warn("Unable to rename " + path + " to " + dstPath);
- }
-
- status.setStatus("Flushing " + this + ": reopening flushed file");
- StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- passSchemaMetricsTo(sf);
-
- StoreFile.Reader r = sf.createReader();
- this.storeSize += r.length();
- this.totalUncompressedBytes += r.getTotalUncompressedBytes();
-
- // This increments the metrics associated with total flushed bytes for this
- // family. The overall flush count is stored in the static metrics and
- // retrieved from HRegion.recentFlushes, which is set within
- // HRegion.internalFlushcache, which indirectly calls this to actually do
- // the flushing through the StoreFlusherImpl class
- getSchemaMetrics().updatePersistentStoreMetric(
- SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushedSize.longValue());
- if (LOG.isInfoEnabled()) {
- LOG.info("Added " + sf + ", entries=" + r.getEntries() +
- ", sequenceid=" + logCacheFlushId +
- ", filesize=" + StringUtils.humanReadableInt(r.length()));
- }
- return sf;
- }
-
- /*
- * @param maxKeyCount
- * @return Writer for a new StoreFile in the tmp dir.
- */
- private StoreFile.Writer createWriterInTmp(int maxKeyCount)
- throws IOException {
- return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
- }
-
- /*
- * @param maxKeyCount
- * @param compression Compression algorithm to use
- * @param isCompaction whether we are creating a new file in a compaction
- * @return Writer for a new StoreFile in the tmp dir.
- */
- StoreFile.Writer createWriterInTmp(int maxKeyCount,
- Compression.Algorithm compression, boolean isCompaction)
- throws IOException {
- final CacheConfig writerCacheConf;
- if (isCompaction) {
- // Don't cache data on write on compactions.
- writerCacheConf = new CacheConfig(cacheConf);
- writerCacheConf.setCacheDataOnWrite(false);
- } else {
- writerCacheConf = cacheConf;
- }
- StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
- fs, blocksize)
- .withOutputDir(region.getTmpDir())
- .withDataBlockEncoder(dataBlockEncoder)
- .withComparator(comparator)
- .withBloomType(family.getBloomFilterType())
- .withMaxKeyCount(maxKeyCount)
- .withChecksumType(checksumType)
- .withBytesPerChecksum(bytesPerChecksum)
- .withCompression(compression)
- .build();
- // The store file writer's path does not include the CF name, so we need
- // to configure the HFile writer directly.
- SchemaConfigured sc = (SchemaConfigured) w.writer;
- SchemaConfigured.resetSchemaMetricsConf(sc);
- passSchemaMetricsTo(sc);
- return w;
- }
-
- /*
- * Change storefiles adding into place the Reader produced by this new flush.
- * @param sf
- * @param set That was used to make the passed file <code>p</code>.
- * @throws IOException
- * @return Whether compaction is required.
+ public boolean hasReferences();
+
+ /**
+ * @return The size of this store's memstore, in bytes
*/
- private boolean updateStorefiles(final StoreFile sf,
- final SortedSet<KeyValue> set)
- throws IOException {
- this.lock.writeLock().lock();
- try {
- ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
- newList.add(sf);
- storefiles = sortAndClone(newList);
-
- this.memstore.clearSnapshot(set);
- } finally {
- // We need the lock, as long as we are updating the storefiles
- // or changing the memstore. Let us release it before calling
- // notifyChangeReadersObservers. See HBASE-4485 for a possible
- // deadlock scenario that could have happened if continue to hold
- // the lock.
- this.lock.writeLock().unlock();
- }
+ public long getMemStoreSize();
- // Tell listeners of the change in readers.
- notifyChangedReadersObservers();
+ public HColumnDescriptor getFamily();
- return needsCompaction();
- }
+ /**
+ * @return The maximum memstoreTS in all store files.
+ */
+ public long getMaxMemstoreTS();
- /*
- * Notify all observers that set of Readers has changed.
- * @throws IOException
+ /**
+ * @return the data block encoder
*/
- private void notifyChangedReadersObservers() throws IOException {
- for (ChangedReadersObserver o: this.changedReaderObservers) {
- o.updateReaders();
- }
- }
-
- /**
- * Get all scanners with no filtering based on TTL (that happens further down
- * the line).
- * @return all scanners for this store
- */
- protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
- boolean isGet,
- boolean isCompaction,
- ScanQueryMatcher matcher) throws IOException {
- List<StoreFile> storeFiles;
- List<KeyValueScanner> memStoreScanners;
- this.lock.readLock().lock();
- try {
- storeFiles = this.getStorefiles();
- memStoreScanners = this.memstore.getScanners();
- } finally {
- this.lock.readLock().unlock();
- }
-
- // First the store file scanners
-
- // TODO this used to get the store files in descending order,
- // but now we get them in ascending order, which I think is
- // actually more correct, since memstore get put at the end.
- List<StoreFileScanner> sfScanners = StoreFileScanner
- .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
- List<KeyValueScanner> scanners =
- new ArrayList<KeyValueScanner>(sfScanners.size()+1);
- scanners.addAll(sfScanners);
- // Then the memstore scanners
- scanners.addAll(memStoreScanners);
- return scanners;
- }
-
- /*
- * @param o Observer who wants to know about changes in set of Readers
- */
- void addChangedReaderObserver(ChangedReadersObserver o) {
- this.changedReaderObservers.add(o);
- }
-
- /*
- * @param o Observer no longer interested in changes in set of Readers.
- */
- void deleteChangedReaderObserver(ChangedReadersObserver o) {
- // We don't check if observer present; it may not be (legitimately)
- this.changedReaderObservers.remove(o);
- }
-
- //////////////////////////////////////////////////////////////////////////////
- // Compaction
- //////////////////////////////////////////////////////////////////////////////
-
- /**
- * Compact the StoreFiles. This method may take some time, so the calling
- * thread must be able to block for long periods.
- *
- * <p>During this time, the Store can work as usual, getting values from
- * StoreFiles and writing new StoreFiles from the memstore.
- *
- * Existing StoreFiles are not destroyed until the new compacted StoreFile is
- * completely written-out to disk.
- *
- * <p>The compactLock prevents multiple simultaneous compactions.
- * The structureLock prevents us from interfering with other write operations.
- *
- * <p>We don't want to hold the structureLock for the whole time, as a compact()
- * can be lengthy and we want to allow cache-flushes during this period.
- *
- * @param cr
- * compaction details obtained from requestCompaction()
- * @throws IOException
- * @return Storefile we compacted into or null if we failed or opted out early.
+ public HFileDataBlockEncoder getDataBlockEncoder();
+
+ /**
+ * @return the number of files in this store
*/
- StoreFile compact(CompactionRequest cr) throws IOException {
- if (cr == null || cr.getFiles().isEmpty()) return null;
- Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
- List<StoreFile> filesToCompact = cr.getFiles();
- synchronized (filesCompacting) {
- // sanity check: we're compacting files that this store knows about
- // TODO: change this to LOG.error() after more debugging
- Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
- }
-
- // Max-sequenceID is the last key in the files we're compacting
- long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
-
- // Ready to go. Have list of files to compact.
- LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this + " of "
- + this.region.getRegionInfo().getRegionNameAsString()
- + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
- + StringUtils.humanReadableInt(cr.getSize()));
-
- StoreFile sf = null;
- try {
- StoreFile.Writer writer =
- this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
- // Move the compaction into place.
- if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
- sf = completeCompaction(filesToCompact, writer);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
- }
- } else {
- // Create storefile around what we wrote with a reader on it.
- sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- sf.createReader();
- }
- } finally {
- synchronized (filesCompacting) {
- filesCompacting.removeAll(filesToCompact);
- }
- }
-
- LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this + " of "
- + this.region.getRegionInfo().getRegionNameAsString()
- + " into " +
- (sf == null ? "none" : sf.getPath().getName()) +
- ", size=" + (sf == null ? "none" :
- StringUtils.humanReadableInt(sf.getReader().length()))
- + "; total size for store is "
- + StringUtils.humanReadableInt(storeSize));
- return sf;
- }
-
- @Override
- public void compactRecentForTesting(int N) throws IOException {
- List<StoreFile> filesToCompact;
- long maxId;
- boolean isMajor;
-
- this.lock.readLock().lock();
- try {
- synchronized (filesCompacting) {
- filesToCompact = Lists.newArrayList(storefiles);
- if (!filesCompacting.isEmpty()) {
- // exclude all files older than the newest file we're currently
- // compacting. this allows us to preserve contiguity (HBASE-2856)
- StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
- int idx = filesToCompact.indexOf(last);
- Preconditions.checkArgument(idx != -1);
- filesToCompact.subList(0, idx + 1).clear();
- }
- int count = filesToCompact.size();
- if (N > count) {
- throw new RuntimeException("Not enough files");
- }
-
- filesToCompact = filesToCompact.subList(count - N, count);
- maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
- isMajor = (filesToCompact.size() == storefiles.size());
- filesCompacting.addAll(filesToCompact);
- Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
- }
- } finally {
- this.lock.readLock().unlock();
- }
-
- try {
- // Ready to go. Have list of files to compact.
- StoreFile.Writer writer =
- this.compactor.compact(this, filesToCompact, isMajor, maxId);
- // Move the compaction into place.
- StoreFile sf = completeCompaction(filesToCompact, writer);
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
- }
- } finally {
- synchronized (filesCompacting) {
- filesCompacting.removeAll(filesToCompact);
- }
- }
- }
-
- @Override
- public boolean hasReferences() {
- return hasReferences(this.storefiles);
- }
-
- /*
- * @param files
- * @return True if any of the files in <code>files</code> are References.
- */
- private boolean hasReferences(Collection<StoreFile> files) {
- if (files != null && files.size() > 0) {
- for (StoreFile hsf: files) {
- if (hsf.isReference()) {
- return true;
- }
- }
- }
- return false;
- }
-
- /*
- * Gets lowest timestamp from candidate StoreFiles
- *
- * @param fs
- * @param dir
- * @throws IOException
+ public int getNumberOfStoreFiles();
+
+ /** @return aggregate size of all HStores used in the last compaction */
+ public long getLastCompactSize();
+
+ /** @return aggregate size of HStore */
+ public long getSize();
+
+ /**
+ * @return Count of store files
*/
- public static long getLowestTimestamp(final List<StoreFile> candidates)
- throws IOException {
- long minTs = Long.MAX_VALUE;
- for (StoreFile storeFile : candidates) {
- minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
- }
- return minTs;
- }
-
- @Override
- public CompactionProgress getCompactionProgress() {
- return this.compactor.getProgress();
- }
-
- @Override
- public boolean isMajorCompaction() throws IOException {
- for (StoreFile sf : this.storefiles) {
- if (sf.getReader() == null) {
- LOG.debug("StoreFile " + sf + " has null Reader");
- return false;
- }
- }
-
- List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
-
- // exclude files above the max compaction threshold
- // except: save all references. we MUST compact them
- int pos = 0;
- while (pos < candidates.size() &&
- candidates.get(pos).getReader().length() > this.maxCompactSize &&
- !candidates.get(pos).isReference()) ++pos;
- candidates.subList(0, pos).clear();
-
- return isMajorCompaction(candidates);
- }
-
- /*
- * @param filesToCompact Files to compact. Can be null.
- * @return True if we should run a major compaction.
- */
- private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
- boolean result = false;
- long mcTime = getNextMajorCompactTime();
- if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
- return result;
- }
- // TODO: Use better method for determining stamp of last major (HBASE-2990)
- long lowTimestamp = getLowestTimestamp(filesToCompact);
- long now = System.currentTimeMillis();
- if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
- // Major compaction time has elapsed.
- if (filesToCompact.size() == 1) {
- // Single file
- StoreFile sf = filesToCompact.get(0);
- long oldest =
- (sf.getReader().timeRangeTracker == null) ?
- Long.MIN_VALUE :
- now - sf.getReader().timeRangeTracker.minimumTimestamp;
- if (sf.isMajorCompaction() &&
- (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this +
- " because one (major) compacted file only and oldestTime " +
- oldest + "ms is < ttl=" + this.ttl);
- }
- } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
- LOG.debug("Major compaction triggered on store " + this +
- ", because keyvalues outdated; time since last major compaction " +
- (now - lowTimestamp) + "ms");
- result = true;
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this +
- "; time since last major compaction " + (now - lowTimestamp) + "ms");
- }
- result = true;
- }
- }
- return result;
- }
-
- long getNextMajorCompactTime() {
- // default = 24hrs
- long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
- if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
- String strCompactionTime =
- family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
- ret = (new Long(strCompactionTime)).longValue();
- }
-
- if (ret > 0) {
- // default = 20% = +/- 4.8 hrs
- double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
- 0.20F);
- if (jitterPct > 0) {
- long jitter = Math.round(ret * jitterPct);
- // deterministic jitter avoids a major compaction storm on restart
- ImmutableList<StoreFile> snapshot = storefiles;
- if (snapshot != null && !snapshot.isEmpty()) {
- String seed = snapshot.get(0).getPath().getName();
- double curRand = new Random(seed.hashCode()).nextDouble();
- ret += jitter - Math.round(2L * jitter * curRand);
- } else {
- ret = 0; // no storefiles == no major compaction
- }
- }
- }
- return ret;
- }
-
- public CompactionRequest requestCompaction() throws IOException {
- return requestCompaction(HStore.NO_PRIORITY);
- }
-
- public CompactionRequest requestCompaction(int priority) throws IOException {
- // don't even select for compaction if writes are disabled
- if (!this.region.areWritesEnabled()) {
- return null;
- }
-
- CompactionRequest ret = null;
- this.lock.readLock().lock();
- try {
- synchronized (filesCompacting) {
- // candidates = all storefiles not already in compaction queue
- List<StoreFile> candidates = Lists.newArrayList(storefiles);
- if (!filesCompacting.isEmpty()) {
- // exclude all files older than the newest file we're currently
- // compacting. this allows us to preserve contiguity (HBASE-2856)
- StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
- int idx = candidates.indexOf(last);
- Preconditions.checkArgument(idx != -1);
- candidates.subList(0, idx + 1).clear();
- }
-
- boolean override = false;
- if (region.getCoprocessorHost() != null) {
- override = region.getCoprocessorHost().preCompactSelection(
- this, candidates);
- }
- CompactSelection filesToCompact;
- if (override) {
- // coprocessor is overriding normal file selection
- filesToCompact = new CompactSelection(conf, candidates);
- } else {
- filesToCompact = compactSelection(candidates, priority);
- }
-
- if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompactSelection(this,
- ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
- }
-
- // no files to compact
- if (filesToCompact.getFilesToCompact().isEmpty()) {
- return null;
- }
-
- // basic sanity check: do not try to compact the same StoreFile twice.
- if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
- // TODO: change this from an IAE to LOG.error after sufficient testing
- Preconditions.checkArgument(false, "%s overlaps with %s",
- filesToCompact, filesCompacting);
- }
- filesCompacting.addAll(filesToCompact.getFilesToCompact());
- Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
-
- // major compaction iff all StoreFiles are included
- boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
- if (isMajor) {
- // since we're enqueuing a major, update the compaction wait interval
- this.forceMajor = false;
- }
-
- // everything went better than expected. create a compaction request
- int pri = getCompactPriority(priority);
- ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
- }
- } finally {
- this.lock.readLock().unlock();
- }
- if (ret != null) {
- CompactionRequest.preRequest(ret);
- }
- return ret;
- }
-
- public void finishRequest(CompactionRequest cr) {
- CompactionRequest.postRequest(cr);
- cr.finishRequest();
- synchronized (filesCompacting) {
- filesCompacting.removeAll(cr.getFiles());
- }
- }
-
- /**
- * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
- * @param candidates
- * @return
- * @throws IOException
+ public int getStorefilesCount();
+
+ /**
+ * @return The size of the store files, in bytes, uncompressed.
*/
- CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
- return compactSelection(candidates,HStore.NO_PRIORITY);
- }
-
- /**
- * Algorithm to choose which files to compact
- *
- * Configuration knobs:
- * "hbase.hstore.compaction.ratio"
- * normal case: minor compact when file <= sum(smaller_files) * ratio
- * "hbase.hstore.compaction.min.size"
- * unconditionally compact individual files below this size
- * "hbase.hstore.compaction.max.size"
- * never compact individual files above this size (unless splitting)
- * "hbase.hstore.compaction.min"
- * min files needed to minor compact
- * "hbase.hstore.compaction.max"
- * max files to compact at once (avoids OOM)
- *
- * @param candidates candidate files, ordered from oldest to newest
- * @return subset copy of candidate list that meets compaction criteria
- * @throws IOException
+ public long getStoreSizeUncompressed();
+
+ /**
+ * @return The size of the store files, in bytes.
*/
- CompactSelection compactSelection(List<StoreFile> candidates, int priority)
- throws IOException {
- // ASSUMPTION!!! filesCompacting is locked when calling this function
-
- /* normal skew:
- *
- * older ----> newer
- * _
- * | | _
- * | | | | _
- * --|-|- |-|- |-|---_-------_------- minCompactSize
- * | | | | | | | | _ | |
- * | | | | | | | | | | | |
- * | | | | | | | | | | | |
- */
- CompactSelection compactSelection = new CompactSelection(conf, candidates);
-
- boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
- if (!forcemajor) {
- // Delete the expired store files before the compaction selection.
- if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
- && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
- CompactSelection expiredSelection = compactSelection
- .selectExpiredStoreFilesToCompact(
- EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
-
- // If there is any expired store files, delete them by compaction.
- if (expiredSelection != null) {
- return expiredSelection;
- }
- }
- // do not compact old files above a configurable threshold
- // save all references. we MUST compact them
- int pos = 0;
- while (pos < compactSelection.getFilesToCompact().size() &&
- compactSelection.getFilesToCompact().get(pos).getReader().length()
- > maxCompactSize &&
- !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
- if (pos != 0) compactSelection.clearSubList(0, pos);
- }
-
- if (compactSelection.getFilesToCompact().isEmpty()) {
- LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this + ": no store files to compact");
- compactSelection.emptyFileList();
- return compactSelection;
- }
-
- // Force a major compaction if this is a user-requested major compaction,
- // or if we do not have too many files to compact and this was requested
- // as a major compaction
- boolean majorcompaction = (forcemajor && priority == HStore.PRIORITY_USER) ||
- (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
- (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
- );
- LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this.getColumnFamilyName() + ": Initiating " +
- (majorcompaction ? "major" : "minor") + "compaction");
-
- if (!majorcompaction &&
- !hasReferences(compactSelection.getFilesToCompact())) {
- // we're doing a minor compaction, let's see what files are applicable
- int start = 0;
- double r = compactSelection.getCompactSelectionRatio();
-
- // skip selection algorithm if we don't have enough files
- if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Not compacting files because we only have " +
- compactSelection.getFilesToCompact().size() +
- " files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
- }
- compactSelection.emptyFileList();
- return compactSelection;
- }
-
- // remove bulk import files that request to be excluded from minors
- compactSelection.getFilesToCompact().removeAll(Collections2.filter(
- compactSelection.getFilesToCompact(),
- new Predicate<StoreFile>() {
- public boolean apply(StoreFile input) {
- return input.excludeFromMinorCompaction();
- }
- }));
-
- /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
- // Sort files by size to correct when normal skew is altered by bulk load.
- Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
- */
-
- // get store file sizes for incremental compacting selection.
- int countOfFiles = compactSelection.getFilesToCompact().size();
- long [] fileSizes = new long[countOfFiles];
- long [] sumSize = new long[countOfFiles];
- for (int i = countOfFiles-1; i >= 0; --i) {
- StoreFile file = compactSelection.getFilesToCompact().get(i);
- fileSizes[i] = file.getReader().length();
- // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
- int tooFar = i + this.maxFilesToCompact - 1;
- sumSize[i] = fileSizes[i]
- + ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
- - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
- }
-
- /* Start at the oldest file and stop when you find the first file that
- * meets compaction criteria:
- * (1) a recently-flushed, small file (i.e. <= minCompactSize)
- * OR
- * (2) within the compactRatio of sum(newer_files)
- * Given normal skew, any newer files will also meet this criteria
- *
- * Additional Note:
- * If fileSizes.size() >> maxFilesToCompact, we will recurse on
- * compact(). Consider the oldest files first to avoid a
- * situation where we always compact [end-threshold,end). Then, the
- * last file becomes an aggregate of the previous compactions.
- */
- while(countOfFiles - start >= this.minFilesToCompact &&
- fileSizes[start] >
- Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
- ++start;
- }
- int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
- long totalSize = fileSizes[start]
- + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
- compactSelection = compactSelection.getSubList(start, end);
-
- // if we don't have enough files to compact, just wait
- if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipped compaction of " + this
- + ". Only " + (end - start) + " file(s) of size "
- + StringUtils.humanReadableInt(totalSize)
- + " have met compaction criteria.");
- }
- compactSelection.emptyFileList();
- return compactSelection;
- }
- } else {
- if(majorcompaction) {
- if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
- LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
- " files, probably because of a user-requested major compaction");
- if(priority != HStore.PRIORITY_USER) {
- LOG.error("Compacting more than max files on a non user-requested compaction");
- }
- }
- } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
- // all files included in this compaction, up to max
- int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
- compactSelection.getFilesToCompact().subList(0, pastMax).clear();
- }
- }
- return compactSelection;
- }
-
- /**
- * Validates a store file by opening and closing it. In HFileV2 this should
- * not be an expensive operation.
- *
- * @param path the path to the store file
- */
- private void validateStoreFile(Path path)
- throws IOException {
- StoreFile storeFile = null;
- try {
- storeFile = new StoreFile(this.fs, path, this.conf,
- this.cacheConf, this.family.getBloomFilterType(),
- NoOpDataBlockEncoder.INSTANCE);
- passSchemaMetricsTo(storeFile);
- storeFile.createReader();
- } catch (IOException e) {
- LOG.error("Failed to open store file : " + path
- + ", keeping it in tmp location", e);
- throw e;
- } finally {
- if (storeFile != null) {
- storeFile.closeReader(false);
- }
- }
- }
-
- /*
- * <p>It works by processing a compaction that's been written to disk.
- *
- * <p>It is usually invoked at the end of a compaction, but might also be
- * invoked at HStore startup, if the prior execution died midway through.
- *
- * <p>Moving the compacted TreeMap into place means:
- * <pre>
- * 1) Moving the new compacted StoreFile into place
- * 2) Unload all replaced StoreFile, close and collect list to delete.
- * 3) Loading the new TreeMap.
- * 4) Compute new store size
- * </pre>
- *
- * @param compactedFiles list of files that were compacted
- * @param compactedFile StoreFile that is the result of the compaction
- * @return StoreFile created. May be null.
- * @throws IOException
+ public long getStorefilesSize();
+
+ /**
+ * @return The size of the store file indexes, in bytes.
*/
- StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
- final StoreFile.Writer compactedFile)
- throws IOException {
- // 1. Moving the new files into place -- if there is a new file (may not
- // be if all cells were expired or deleted).
- StoreFile result = null;
- if (compactedFile != null) {
- validateStoreFile(compactedFile.getPath());
- // Move the file into the right spot
- Path origPath = compactedFile.getPath();
- Path destPath = new Path(homedir, origPath.getName());
- LOG.info("Renaming compacted file at " + origPath + " to " + destPath);
- if (!fs.rename(origPath, destPath)) {
- LOG.error("Failed move of compacted file " + origPath + " to " +
- destPath);
- throw new IOException("Failed move of compacted file " + origPath +
- " to " + destPath);
- }
- result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- passSchemaMetricsTo(result);
- result.createReader();
- }
- try {
- this.lock.writeLock().lock();
- try {
- // Change this.storefiles so it reflects new state but do not
- // delete old store files until we have sent out notification of
- // change in case old files are still being accessed by outstanding
- // scanners.
- ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
- newStoreFiles.removeAll(compactedFiles);
- filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
-
- // If a StoreFile result, move it into place. May be null.
- if (result != null) {
- newStoreFiles.add(result);
- }
-
- this.storefiles = sortAndClone(newStoreFiles);
- } finally {
- // We need the lock, as long as we are updating the storefiles
- // or changing the memstore. Let us release it before calling
- // notifyChangeReadersObservers. See HBASE-4485 for a possible
- // deadlock scenario that could have happened if continue to hold
- // the lock.
- this.lock.writeLock().unlock();
- }
-
- // Tell observers that list of StoreFiles has changed.
- notifyChangedReadersObservers();
-
- // let the archive util decide if we should archive or delete the files
- LOG.debug("Removing store files after compaction...");
- HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, this.family.getName(),
- compactedFiles);
-
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files in " + this +
- ". Compacted file is " + (result == null? "none": result.toString()) +
- ". Files replaced " + compactedFiles.toString() +
- " some of which may have been already removed", e);
- }
-
- // 4. Compute new store size
- this.storeSize = 0L;
- this.totalUncompressedBytes = 0L;
- for (StoreFile hsf : this.storefiles) {
- StoreFile.Reader r = hsf.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + hsf + " has a null Reader");
- continue;
- }
- this.storeSize += r.length();
- this.totalUncompressedBytes += r.getTotalUncompressedBytes();
- }
- return result;
- }
-
- public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
- Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
- ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
- return newList;
- }
-
- // ////////////////////////////////////////////////////////////////////////////
- // Accessors.
- // (This is the only section that is directly useful!)
- //////////////////////////////////////////////////////////////////////////////
- @Override
- public int getNumberOfStoreFiles() {
- return this.storefiles.size();
- }
-
- /*
- * @param wantedVersions How many versions were asked for.
- * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
- */
- int versionsToReturn(final int wantedVersions) {
- if (wantedVersions <= 0) {
- throw new IllegalArgumentException("Number of versions must be > 0");
- }
- // Make sure we do not return more than maximum versions for this store.
- int maxVersions = this.family.getMaxVersions();
- return wantedVersions > maxVersions ? maxVersions: wantedVersions;
- }
-
- static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
- }
-
- @Override
- public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
- // If minVersions is set, we will not ignore expired KVs.
- // As we're only looking for the latest matches, that should be OK.
- // With minVersions > 0 we guarantee that any KV that has any version
- // at all (expired or not) has at least one version that will not expire.
- // Note that this method used to take a KeyValue as arguments. KeyValue
- // can be back-dated, a row key cannot.
- long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.ttl;
-
- KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
-
- GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
- this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion());
- this.lock.readLock().lock();
- try {
- // First go to the memstore. Pick up deletes and candidates.
- this.memstore.getRowKeyAtOrBefore(state);
- // Check if match, if we got a candidate on the asked for 'kv' row.
- // Process each store file. Run through from newest to oldest.
- for (StoreFile sf : Lists.reverse(storefiles)) {
- // Update the candidate keys from the current map file
- rowAtOrBeforeFromStoreFile(sf, state);
- }
- return state.getCandidate();
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- /*
- * Check an individual MapFile for the row at or before a given row.
- * @param f
- * @param state
- * @throws IOException
+ public long getStorefilesIndexSize();
+
+ /**
+ * Returns the total size of all index blocks in the data block indexes, including the root level,
+ * intermediate levels, and the leaf level for multi-level indexes, or just the root level for
+ * single-level indexes.
+ * @return the total size of block indexes in the store
*/
- private void rowAtOrBeforeFromStoreFile(final StoreFile f,
- final GetClosestRowBeforeTracker state)
- throws IOException {
- StoreFile.Reader r = f.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + f + " has a null Reader");
- return;
- }
- if (r.getEntries() == 0) {
- LOG.warn("StoreFile " + f + " is a empty store file");
- return;
- }
- // TODO: Cache these keys rather than make each time?
- byte [] fk = r.getFirstKey();
- KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
- byte [] lk = r.getLastKey();
- KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
- KeyValue firstOnRow = state.getTargetKey();
- if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
- // If last key in file is not of the target table, no candidates in this
- // file. Return.
- if (!state.isTargetTable(lastKV)) return;
- // If the row we're looking for is past the end of file, set search key to
- // last key. TODO: Cache last and first key rather than make each time.
- firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
- }
- // Get a scanner that caches blocks and that uses pread.
- HFileScanner scanner = r.getHFileReader().getScanner(true, true, false);
- // Seek scanner. If can't seek it, return.
- if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
- // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
- // Unlikely that there'll be an instance of actual first row in table.
- if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
- // If here, need to start backing up.
- while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
- firstOnRow.getKeyLength())) {
- KeyValue kv = scanner.getKeyValue();
- if (!state.isTargetTable(kv)) break;
- if (!state.isBetterCandidate(kv)) break;
- // Make new first on row.
- firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
- // Seek scanner. If can't seek it, break.
- if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
- // If we find something, break;
- if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
- }
- }
-
- /*
- * Seek the file scanner to firstOnRow or first entry in file.
- * @param scanner
- * @param firstOnRow
- * @param firstKV
- * @return True if we successfully seeked scanner.
- * @throws IOException
+ public long getTotalStaticIndexSize();
+
+ /**
+ * Returns the total byte size of all Bloom filter bit arrays. For compound Bloom filters even the
+ * Bloom blocks currently not loaded into the block cache are counted.
+ * @return the total size of all Bloom filters in the store
*/
- private boolean seekToScanner(final HFileScanner scanner,
- final KeyValue firstOnRow,
- final KeyValue firstKV)
- throws IOException {
- KeyValue kv = firstOnRow;
- // If firstOnRow < firstKV, set to firstKV
- if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
- int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
- kv.getKeyLength());
- return result >= 0;
- }
-
- /*
- * When we come in here, we are probably at the kv just before we break into
- * the row that firstOnRow is on. Usually need to increment one time to get
- * on to the row we are interested in.
- * @param scanner
- * @param firstOnRow
- * @param state
- * @return True we found a candidate.
- * @throws IOException
+ public long getTotalStaticBloomSize();
+
+ // Test-helper methods
+
+ /**
+ * Compact the most recent N files. Used in testing.
+ * @param N number of files to compact. Must be less than or equal to current number of files.
+ * @throws IOException on failure
*/
- private boolean walkForwardInSingleRow(final HFileScanner scanner,
- final KeyValue firstOnRow,
- final GetClosestRowBeforeTracker state)
- throws IOException {
- boolean foundCandidate = false;
- do {
- KeyValue kv = scanner.getKeyValue();
- // If we are not in the row, skip.
- if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
- // Did we go beyond the target row? If so break.
- if (state.isTooFar(kv, firstOnRow)) break;
- if (state.isExpired(kv)) {
- continue;
- }
- // If we added something, this row is a contender. break.
- if (state.handle(kv)) {
- foundCandidate = true;
- break;
- }
- } while(scanner.next());
- return foundCandidate;
- }
-
- public boolean canSplit() {
- this.lock.readLock().lock();
- try {
- // Not splitable if we find a reference store file present in the store.
- for (StoreFile sf : storefiles) {
- if (sf.isReference()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(sf + " is not splittable");
- }
- return false;
- }
- }
-
- return true;
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- @Override
- public byte[] getSplitPoint() {
- this.lock.readLock().lock();
- try {
- // sanity checks
- if (this.storefiles.isEmpty()) {
- return null;
- }
- // Should already be enforced by the split policy!
- assert !this.region.getRegionInfo().isMetaRegion();
-
- // Not splitable if we find a reference store file present in the store.
- long maxSize = 0L;
- StoreFile largestSf = null;
- for (StoreFile sf : storefiles) {
- if (sf.isReference()) {
- // Should already be enforced since we return false in this case
- assert false : "getSplitPoint() called on a region that can't split!";
- return null;
- }
-
- StoreFile.Reader r = sf.getReader();
- if (r == null) {
- LOG.warn("Storefile " + sf + " Reader is null");
- continue;
- }
-
- long size = r.length();
- if (size > maxSize) {
- // This is the largest one so far
- maxSize = size;
- largestSf = sf;
- }
- }
-
- StoreFile.Reader r = largestSf.getReader();
- if (r == null) {
- LOG.warn("Storefile " + largestSf + " Reader is null");
- return null;
- }
- // Get first, last, and mid keys. Midkey is the key that starts block
- // in middle of hfile. Has column and timestamp. Need to return just
- // the row we want to split on as midkey.
- byte [] midkey = r.midkey();
- if (midkey != null) {
- KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
- byte [] fk = r.getFirstKey();
- KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
- byte [] lk = r.getLastKey();
- KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
- // if the midkey is the same as the first or last keys, then we cannot
- // (ever) split this region.
- if (this.comparator.compareRows(mk, firstKey) == 0 ||
- this.comparator.compareRows(mk, lastKey) == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("cannot split because midkey is the same as first or " +
- "last row");
- }
- return null;
- }
- return mk.getRow();
- }
- } catch(IOException e) {
- LOG.warn("Failed getting store size for " + this, e);
- } finally {
- this.lock.readLock().unlock();
- }
- return null;
- }
-
- @Override
- public long getLastCompactSize() {
- return this.lastCompactSize;
- }
-
- @Override
- public long getSize() {
- return storeSize;
- }
-
- public void triggerMajorCompaction() {
- this.forceMajor = true;
- }
-
- boolean getForceMajorCompaction() {
- return this.forceMajor;
- }
-
- //////////////////////////////////////////////////////////////////////////////
- // File administration
- //////////////////////////////////////////////////////////////////////////////
-
- @Override
- public KeyValueScanner getScanner(Scan scan,
- final NavigableSet<byte []> targetCols) throws IOException {
- lock.readLock().lock();
- try {
- KeyValueScanner scanner = null;
- if (getHRegion().getCoprocessorHost() != null) {
- scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
- }
- if (scanner == null) {
- scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
- }
- return scanner;
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public String toString() {
- return getColumnFamilyName();
- }
-
- @Override
- public int getStorefilesCount() {
- return this.storefiles.size();
- }
-
- @Override
- public long getStoreSizeUncompressed() {
- return this.totalUncompressedBytes;
- }
-
- @Override
- public long getStorefilesSize() {
- long size = 0;
- for (StoreFile s: storefiles) {
- StoreFile.Reader r = s.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + s + " has a null Reader");
- continue;
- }
- size += r.length();
- }
- return size;
- }
-
- @Override
- public long getStorefilesIndexSize() {
- long size = 0;
- for (StoreFile s: storefiles) {
- StoreFile.Reader r = s.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + s + " has a null Reader");
- continue;
- }
- size += r.indexSize();
- }
- return size;
- }
-
- @Override
- public long getTotalStaticIndexSize() {
- long size = 0;
- for (StoreFile s : storefiles) {
- size += s.getReader().getUncompressedDataIndexSize();
- }
- return size;
- }
-
- @Override
- public long getTotalStaticBloomSize() {
- long size = 0;
- for (StoreFile s : storefiles) {
- StoreFile.Reader r = s.getReader();
- size += r.getTotalBloomSize();
- }
- return size;
- }
-
- @Override
- public long getMemStoreSize() {
- return this.memstore.heapSize();
- }
-
- public int getCompactPriority() {
- return getCompactPriority(HStore.NO_PRIORITY);
- }
-
- @Override
- public int getCompactPriority(int priority) {
- // If this is a user-requested compaction, leave this at the highest priority
- if(priority == HStore.PRIORITY_USER) {
- return HStore.PRIORITY_USER;
- } else {
- return this.blockingStoreFileCount - this.storefiles.size();
- }
- }
-
- @Override
- public boolean throttleCompaction(long compactionSize) {
- // see HBASE-5867 for discussion on the default
- long throttlePoint = conf.getLong(
- "hbase.regionserver.thread.compaction.throttle",
- 2 * this.minFilesToCompact * this.region.memstoreFlushSize);
- return compactionSize > throttlePoint;
- }
-
- @Override
- public HRegion getHRegion() {
- return this.region;
- }
-
- HRegionInfo getHRegionInfo() {
- return this.region.regionInfo;
- }
-
- @Override
- public long updateColumnValue(byte [] row, byte [] f,
- byte [] qualifier, long newValue)
- throws IOException {
-
- this.lock.readLock().lock();
- try {
- long now = EnvironmentEdgeManager.currentTimeMillis();
-
- return this.memstore.updateColumnValue(row,
- f,
- qualifier,
- newValue,
- now);
-
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- @Override
- public long upsert(Iterable<KeyValue> kvs) throws IOException {
- this.lock.readLock().lock();
- try {
- // TODO: Make this operation atomic w/ MVCC
- return this.memstore.upsert(kvs);
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- public StoreFlusher getStoreFlusher(long cacheFlushId) {
- return new StoreFlusherImpl(cacheFlushId);
- }
-
- private class StoreFlusherImpl implements StoreFlusher {
-
- private long cacheFlushId;
- private SortedSet<KeyValue> snapshot;
- private StoreFile storeFile;
- private Path storeFilePath;
- private TimeRangeTracker snapshotTimeRangeTracker;
- private AtomicLong flushedSize;
-
- private StoreFlusherImpl(long cacheFlushId) {
- this.cacheFlushId = cacheFlushId;
- this.flushedSize = new AtomicLong();
- }
-
- @Override
- public void prepare() {
- memstore.snapshot();
- this.snapshot = memstore.getSnapshot();
- this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
- }
-
- @Override
- public void flushCache(MonitoredTask status) throws IOException {
- storeFilePath = Store.this.flushCache(
- cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status);
- }
-
- @Override
- public boolean commit(MonitoredTask status) throws IOException {
- if (storeFilePath == null) {
- return false;
- }
- storeFile = Store.this.commitFile(storeFilePath, cacheFlushId,
- snapshotTimeRangeTracker, flushedSize, status);
- if (Store.this.getHRegion().getCoprocessorHost() != null) {
- Store.this.getHRegion()
- .getCoprocessorHost()
- .postFlush(Store.this, storeFile);
- }
-
- // Add new file to store files. Clear snapshot too while we have
- // the Store write lock.
- return Store.this.updateStorefiles(storeFile, snapshot);
- }
- }
-
- @Override
- public boolean needsCompaction() {
- return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
- }
-
- @Override
- public CacheConfig getCacheConfig() {
- return this.cacheConf;
- }
-
- public static final long FIXED_OVERHEAD =
- ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
- + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
-
- public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
- + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
- + ClassSize.CONCURRENT_SKIPLISTMAP
- + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
- + ScanInfo.FIXED_OVERHEAD);
-
- @Override
- public long heapSize() {
- return DEEP_OVERHEAD + this.memstore.heapSize();
- }
-
- public KeyValue.KVComparator getComparator() {
- return comparator;
- }
-
- public ScanInfo getScanInfo() {
- return scanInfo;
- }
-
- /**
- * Immutable information for scans over a store.
- */
- public static class ScanInfo {
- private byte[] family;
- private int minVersions;
- private int maxVersions;
- private long ttl;
- private boolean keepDeletedCells;
- private long timeToPurgeDeletes;
- private KVComparator comparator;
-
- public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
- + (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
- + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
-
- /**
- * @param family {@link HColumnDescriptor} describing the column family
- * @param ttl Store's TTL (in ms)
- * @param timeToPurgeDeletes duration in ms after which a delete marker can
- * be purged during a major compaction.
- * @param comparator The store's comparator
- */
- public ScanInfo(HColumnDescriptor family, long ttl, long timeToPurgeDeletes, KVComparator comparator) {
- this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
- .getKeepDeletedCells(), timeToPurgeDeletes, comparator);
- }
- /**
- * @param family Name of this store's column family
- * @param minVersions Store's MIN_VERSIONS setting
- * @param maxVersions Store's VERSIONS setting
- * @param ttl Store's TTL (in ms)
- * @param timeToPurgeDeletes duration in ms after which a delete marker can
- * be purged during a major compaction.
- * @param keepDeletedCells Store's keepDeletedCells setting
- * @param comparator The store's comparator
- */
- public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
- boolean keepDeletedCells, long timeToPurgeDeletes,
- KVComparator comparator) {
-
- this.family = family;
- this.minVersions = minVersions;
- this.maxVersions = maxVersions;
- this.ttl = ttl;
- this.keepDeletedCells = keepDeletedCells;
- this.timeToPurgeDeletes = timeToPurgeDeletes;
- this.comparator = comparator;
- }
-
- public byte[] getFamily() {
- return family;
- }
-
- public int getMinVersions() {
- return minVersions;
- }
-
- public int getMaxVersions() {
- return maxVersions;
- }
-
- public long getTtl() {
- return ttl;
- }
-
- public boolean getKeepDeletedCells() {
- return keepDeletedCells;
- }
-
- public long getTimeToPurgeDeletes() {
- return timeToPurgeDeletes;
- }
-
- public KVComparator getComparator() {
- return comparator;
- }
- }
+ public void compactRecentForTesting(int N) throws IOException;
-}
+ /**
+ * Used for tests.
+ * @return cache configuration for this Store.
+ */
+ public CacheConfig getCacheConfig();
+
+ /**
+ * @return the parent region hosting this store
+ */
+ public HRegion getHRegion();
+}
[... 2 lines stripped ...]