You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/09/24 19:56:28 UTC
[3/3] hbase git commit: Backport HBASE-14098 (Allow dropping caches
behind compactions) to 0.98 (Liu Shaohui)
Backport HBASE-14098 (Allow dropping caches behind compactions) to 0.98 (Liu Shaohui)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/672d74e7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/672d74e7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/672d74e7
Branch: refs/heads/0.98
Commit: 672d74e7ba8c6caaedf3a549c30db5e1cbb4289b
Parents: 06f1270
Author: Andrew Purtell <ap...@apache.org>
Authored: Thu Sep 24 09:52:21 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Thu Sep 24 09:52:21 2015 -0700
----------------------------------------------------------------------
.../hbase/io/FSDataInputStreamWrapper.java | 38 +++++++++++++++++---
.../hadoop/hbase/io/hfile/CacheConfig.java | 26 +++++++++++---
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 25 +++++++++++++
.../hbase/regionserver/DefaultStoreFlusher.java | 7 ++--
.../hadoop/hbase/regionserver/HStore.java | 22 ++++++++----
.../apache/hadoop/hbase/regionserver/Store.java | 25 +++++++++++--
.../hadoop/hbase/regionserver/StoreFile.java | 24 +++++++++----
.../hbase/regionserver/StoreFileInfo.java | 12 ++++---
.../hbase/regionserver/StoreFileScanner.java | 29 +++++++++++++--
.../hbase/regionserver/StripeStoreFlusher.java | 6 +++-
.../regionserver/compactions/Compactor.java | 10 ++++--
.../compactions/DefaultCompactor.java | 14 +++++---
.../compactions/StripeCompactor.java | 8 +++--
.../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 2 +-
.../hbase/regionserver/TestFSErrorsExposed.java | 2 +-
.../regionserver/TestReversibleScanners.java | 8 ++---
.../hbase/regionserver/TestStripeCompactor.java | 3 +-
.../compactions/TestStripeCompactionPolicy.java | 3 +-
18 files changed, 212 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 5950585..e36ca8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.hbase.io;
import java.io.IOException;
+import java.lang.reflect.Method;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.FileLink;
import com.google.common.annotations.VisibleForTesting;
@@ -33,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
* see method comments.
*/
public class FSDataInputStreamWrapper {
+ static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
+
private final HFileSystem hfs;
private final Path path;
private final FileLink link;
@@ -76,14 +80,23 @@ public class FSDataInputStreamWrapper {
private volatile int hbaseChecksumOffCount = -1;
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
- this(fs, null, path);
+ this(fs, null, path, false);
+ }
+
+ public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
+ this(fs, null, path, dropBehind);
}
public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
- this(fs, link, null);
+ this(fs, link, null, false);
+ }
+ public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+ boolean dropBehind) throws IOException {
+ this(fs, link, null, dropBehind);
}
- private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException {
+ private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+ Path path, boolean dropBehind) throws IOException {
assert (path == null) != (link == null);
this.path = path;
this.link = link;
@@ -96,8 +109,25 @@ public class FSDataInputStreamWrapper {
// Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+ try {
+ Class<? extends FSDataInputStream> inputStreamClass = this.stream.getClass();
+ try {
+ Method m = inputStreamClass.getDeclaredMethod("setDropBehind",
+ new Class[] { boolean.class });
+ m.invoke(stream, new Object[] { dropBehind });
+ } catch (NoSuchMethodException e) {
+ // Not supported, we can just ignore it
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to invoke input stream's setDropBehind method, continuing");
+ }
+ }
+ } catch (Exception e) {
+ // Skipped.
+ }
}
+
/**
* Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
* reads finish and before any other reads start (what happens in reality is we read the
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 376df07..d156521 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -116,6 +116,10 @@ public class CacheConfig {
*/
public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize";
+ private static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
+ "hbase.hfile.drop.behind.compaction";
+ private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = false;
+
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -157,6 +161,9 @@ public class CacheConfig {
/** Whether data blocks should be prefetched into the cache */
private final boolean prefetchOnOpen;
+ /** Whether or not to drop file data from the OS blockcache behind a compaction */
+ private final boolean dropBehindCompaction;
+
/**
* Create a cache configuration using the specified configuration object and
* family descriptor.
@@ -179,7 +186,8 @@ public class CacheConfig {
DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(),
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
- DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen()
+ DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen(),
+ conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
);
}
@@ -198,8 +206,9 @@ public class CacheConfig {
conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE),
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE),
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
- conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN)
- );
+ conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN),
+ conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
+ );
}
/**
@@ -219,7 +228,8 @@ public class CacheConfig {
final boolean cacheDataOnRead, final boolean inMemory,
final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
final boolean cacheBloomsOnWrite, final boolean evictOnClose,
- final boolean cacheDataCompressed, final boolean prefetchOnOpen) {
+ final boolean cacheDataCompressed, final boolean prefetchOnOpen,
+ final boolean dropBehindCompaction) {
this.blockCache = blockCache;
this.cacheDataOnRead = cacheDataOnRead;
this.inMemory = inMemory;
@@ -229,6 +239,7 @@ public class CacheConfig {
this.evictOnClose = evictOnClose;
this.cacheDataCompressed = cacheDataCompressed;
this.prefetchOnOpen = prefetchOnOpen;
+ this.dropBehindCompaction = dropBehindCompaction;
}
/**
@@ -239,7 +250,8 @@ public class CacheConfig {
this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory,
cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
- cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen);
+ cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen,
+ cacheConf.dropBehindCompaction);
}
/**
@@ -265,6 +277,10 @@ public class CacheConfig {
return isBlockCacheEnabled() && cacheDataOnRead;
}
+ public boolean shouldDropBehindCompaction() {
+ return dropBehindCompaction;
+ }
+
/**
* Should we cache a block of a particular category? We always cache
* important blocks such as index blocks, as long as the block cache is
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 748d61d..fbf28ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -25,6 +25,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.SequenceInputStream;
+import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -256,6 +257,7 @@ public class HFile {
protected KVComparator comparator = KeyValue.COMPARATOR;
protected InetSocketAddress[] favoredNodes;
private HFileContext fileContext;
+ protected boolean shouldDropBehind = false;
WriterFactory(Configuration conf, CacheConfig cacheConf) {
this.conf = conf;
@@ -293,6 +295,12 @@ public class HFile {
return this;
}
+ public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) {
+ this.shouldDropBehind = shouldDropBehind;
+ return this;
+ }
+
+
public Writer create() throws IOException {
if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
throw new AssertionError("Please specify exactly one of " +
@@ -300,6 +308,23 @@ public class HFile {
}
if (path != null) {
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
+ try {
+ Class<? extends FSDataOutputStream> outStreamClass = ostream.getClass();
+ try {
+ Method m = outStreamClass.getDeclaredMethod("setDropBehind",
+ new Class[]{ boolean.class });
+ m.invoke(ostream, new Object[] {
+ shouldDropBehind && cacheConf.shouldDropBehindCompaction() });
+ } catch (NoSuchMethodException e) {
+ // Not supported, we can just ignore it
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to invoke output stream's setDropBehind method, continuing");
+ }
+ }
+ } catch (UnsupportedOperationException uoe) {
+ LOG.debug("Unable to set drop behind on " + path, uoe);
+ }
}
return createWriter(fs, path, ostream,
comparator, fileContext);
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a13de99..9dd9998 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -67,8 +67,11 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
- writer = store.createWriterInTmp(
- snapshot.size(), store.getFamily().getCompression(), false, true, true);
+ writer = store.createWriterInTmp(snapshot.size(), store.getFamily().getCompression(),
+ /* isCompaction = */ false,
+ /* includeMVCCReadpoint = */ true,
+ /* includesTags = */ true,
+ /* shouldDropBehind = */ false);
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
IOException e = null;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f7d7749..da83e4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -881,6 +881,15 @@ public class HStore implements Store {
return sf;
}
+ @Override
+ public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
+ boolean isCompaction, boolean includeMVCCReadpoint,
+ boolean includesTag)
+ throws IOException {
+ return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
+ includesTag, false);
+ }
+
/*
* @param maxKeyCount
* @param compression Compression algorithm to use
@@ -891,7 +900,8 @@ public class HStore implements Store {
*/
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
- boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
+ boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
+ boolean shouldDropBehind)
throws IOException {
final CacheConfig writerCacheConf;
if (isCompaction) {
@@ -916,6 +926,7 @@ public class HStore implements Store {
.withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
+ .withShouldDropCacheBehind(shouldDropBehind)
.build();
return w;
}
@@ -1014,9 +1025,8 @@ public class HStore implements Store {
// TODO this used to get the store files in descending order,
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
- List<StoreFileScanner> sfScanners = StoreFileScanner
- .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
- readPt);
+ List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
+ cacheBlocks, usePread, isCompaction, false, matcher, readPt);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
@@ -1088,7 +1098,7 @@ public class HStore implements Store {
CompactionThroughputController throughputController) throws IOException {
assert compaction != null;
List<StoreFile> sfs = null;
- CompactionRequest cr = compaction.getRequest();;
+ CompactionRequest cr = compaction.getRequest();
try {
// Do all sanity checking in here if we have a valid CompactionRequest
// because we need to clean up after it on the way out in a finally
@@ -2025,7 +2035,7 @@ public class HStore implements Store {
return new StoreFlusherImpl(cacheFlushId);
}
- private class StoreFlusherImpl implements StoreFlushContext {
+ private final class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushSeqNum;
private SortedSet<KeyValue> snapshot;
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index c9d101f..ec3d54a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -156,11 +156,28 @@ public interface Store extends HeapSize, StoreConfigInformation {
FileSystem getFileSystem();
- /*
+
+ /**
+ * @param maxKeyCount
+ * @param compression Compression algorithm to use
+ * @param isCompaction whether we are creating a new file in a compaction
+ * @param includeMVCCReadpoint whether we should out the MVCC readpoint
+ * @return Writer for a new StoreFile in the tmp dir.
+ */
+ StoreFile.Writer createWriterInTmp(
+ long maxKeyCount,
+ Compression.Algorithm compression,
+ boolean isCompaction,
+ boolean includeMVCCReadpoint,
+ boolean includesTags
+ ) throws IOException;
+
+ /**
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
+ * @param shouldDropBehind should the writer drop caches behind writes
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
@@ -168,9 +185,13 @@ public interface Store extends HeapSize, StoreConfigInformation {
Compression.Algorithm compression,
boolean isCompaction,
boolean includeMVCCReadpoint,
- boolean includesTags
+ boolean includesTags,
+ boolean shouldDropBehind
) throws IOException;
+
+
+
// Compaction oriented methods
boolean throttleCompaction(long compactionSize);
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index a90210a..bdf483d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -253,8 +253,8 @@ public class StoreFile {
}
/**
- * @return True if this is a StoreFile Reference; call after {@link #open()}
- * else may get wrong answer.
+ * @return True if this is a StoreFile Reference; call
+ * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
*/
public boolean isReference() {
return this.fileInfo.isReference();
@@ -367,13 +367,13 @@ public class StoreFile {
* @throws IOException
* @see #closeReader(boolean)
*/
- private Reader open() throws IOException {
+ private Reader open(boolean canUseDropBehind) throws IOException {
if (this.reader != null) {
throw new IllegalAccessError("Already open");
}
// Open the StoreFile.Reader
- this.reader = fileInfo.open(this.fs, this.cacheConf);
+ this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@@ -463,14 +463,18 @@ public class StoreFile {
return this.reader;
}
+ public Reader createReader() throws IOException {
+ return createReader(false);
+ }
+
/**
* @return Reader for StoreFile. creates if necessary
* @throws IOException
*/
- public Reader createReader() throws IOException {
+ public Reader createReader(boolean canUseDropBehind) throws IOException {
if (this.reader == null) {
try {
- this.reader = open();
+ this.reader = open(canUseDropBehind);
} catch (IOException e) {
try {
this.closeReader(true);
@@ -547,6 +551,8 @@ public class StoreFile {
private Path filePath;
private InetSocketAddress[] favoredNodes;
private HFileContext fileContext;
+ private boolean shouldDropCacheBehind = false;
+
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
this.conf = conf;
@@ -612,6 +618,11 @@ public class StoreFile {
this.fileContext = fileContext;
return this;
}
+
+ public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
+ this.shouldDropCacheBehind = shouldDropCacheBehind;
+ return this;
+ }
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@@ -1262,6 +1273,7 @@ public class StoreFile {
case ROWCOL:
key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
colOffset, colLen);
+
break;
default:
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index e4d0e83..5e46d94 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -175,21 +174,24 @@ public class StoreFileInfo {
* @return The StoreFile.Reader for the file
*/
public StoreFile.Reader open(final FileSystem fs,
- final CacheConfig cacheConf) throws IOException {
+ final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
FSDataInputStreamWrapper in;
FileStatus status;
+ final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
if (this.link != null) {
// HFileLink
- in = new FSDataInputStreamWrapper(fs, this.link);
+ in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
status = this.link.getFileStatus(fs);
} else if (this.reference != null) {
// HFile Reference
Path referencePath = getReferredToFile(this.getPath());
- in = new FSDataInputStreamWrapper(fs, referencePath);
+ in = new FSDataInputStreamWrapper(fs, referencePath,
+ doDropBehind);
status = fs.getFileStatus(referencePath);
} else {
- in = new FSDataInputStreamWrapper(fs, this.getPath());
+ in = new FSDataInputStreamWrapper(fs, this.getPath(),
+ doDropBehind);
status = fileStatus;
}
long length = status.getLen();
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index e4116fd..d9e2eab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -87,7 +87,7 @@ public class StoreFileScanner implements KeyValueScanner {
boolean cacheBlocks,
boolean usePread, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks,
- usePread, false, readPt);
+ usePread, false, false, readPt);
}
/**
@@ -97,7 +97,17 @@ public class StoreFileScanner implements KeyValueScanner {
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
- null, readPt);
+ false, null, readPt);
+ }
+
+ /**
+ * Return an array of scanners corresponding to the given set of store files.
+ */
+ public static List<StoreFileScanner> getScannersForStoreFiles(
+ Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
+ boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
+ return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
+ useDropBehind, null, readPt);
}
/**
@@ -108,10 +118,23 @@ public class StoreFileScanner implements KeyValueScanner {
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
+ return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false,
+ matcher, readPt);
+ }
+
+ /**
+ * Return an array of scanners corresponding to the given set of store files,
+ * And set the ScanQueryMatcher for each store file scanner for further
+ * optimization
+ */
+ public static List<StoreFileScanner> getScannersForStoreFiles(
+ Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
+ boolean isCompaction, boolean canUseDrop,
+ ScanQueryMatcher matcher, long readPt) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size());
for (StoreFile file : files) {
- StoreFile.Reader r = file.createReader();
+ StoreFile.Reader r = file.createReader(canUseDrop);
StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
isCompaction, readPt);
scanner.setScanQueryMatcher(matcher);
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index b03c8aa..a52b1bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -115,7 +115,11 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override
public Writer createWriter() throws IOException {
StoreFile.Writer writer = store.createWriterInTmp(
- kvCount, store.getFamily().getCompression(), false, true, true);
+ kvCount, store.getFamily().getCompression(),
+ /* isCompaction = */ false,
+ /* includeMVCCReadpoint = */ true,
+ /* includesTags = */ true,
+ /* shouldDropBehind = */ false);
writer.setTimeRangeTracker(tracker);
return writer;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index c4be217..42e4953 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -163,8 +163,14 @@ public abstract class Compactor {
* @return Scanners.
*/
protected List<StoreFileScanner> createFileScanners(
- final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
- return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
+ final Collection<StoreFile> filesToCompact,
+ long smallestReadPoint,
+ boolean useDropBehind) throws IOException {
+ return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
+ /* cache blocks = */ false,
+ /* use pread = */ false,
+ /* is compaction */ true,
+ /* use Drop Behind */ useDropBehind,
smallestReadPoint);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index b505efc..dfbd3f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -60,17 +60,19 @@ public class DefaultCompactor extends Compactor {
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
- if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) {
+ if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFileFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(new StoreFile(f));
}
- scanners = createFileScanners(readersToClose, smallestReadPoint);
+ scanners = createFileScanners(readersToClose, smallestReadPoint,
+ store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
- scanners = createFileScanners(request.getFiles(), smallestReadPoint);
+ scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+ store.throttleCompaction(request.getSize()));
}
StoreFile.Writer writer = null;
@@ -94,14 +96,16 @@ public class DefaultCompactor extends Compactor {
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see HBASE-6059
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
- fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
+ fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
+
boolean finished =
performCompaction(scanner, writer, smallestReadPoint, throughputController);
+
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
writer = null;
- throw new InterruptedIOException( "Aborting compaction of store " + store +
+ throw new InterruptedIOException("Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 7a5cbb4..342ecce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -81,7 +81,7 @@ public class StripeCompactor extends Compactor {
throughputController);
}
- private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request,
+ private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
@@ -89,7 +89,8 @@ public class StripeCompactor extends Compactor {
this.progress = new CompactionProgress(fd.maxKeyCount);
long smallestReadPoint = getSmallestReadPoint();
- List<StoreFileScanner> scanners = createFileScanners(filesToCompact, smallestReadPoint);
+ List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
+ smallestReadPoint, store.throttleCompaction(request.getSize()));
boolean finished = false;
InternalScanner scanner = null;
@@ -117,7 +118,8 @@ public class StripeCompactor extends Compactor {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(
- fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
+ fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
+ store.throttleCompaction(request.getSize()));
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 3952fa4..d7f18dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -245,7 +245,7 @@ public class TestCacheOnWrite {
cacheConf =
new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
cowType.shouldBeCached(BlockType.LEAF_INDEX),
- cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false);
+ cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
index 6d7d675..94a5ba7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
@@ -142,7 +142,7 @@ public class TestFSErrorsExposed {
cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
- Collections.singletonList(sf), false, true, false,
+ Collections.singletonList(sf), false, true, false, false,
// 0 is passed as readpoint because this test operates on StoreFile directly
0);
KeyValueScanner scanner = scanners.get(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 14229de..b7bc6f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -104,14 +104,14 @@ public class TestReversibleScanners {
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner
- .getScannersForStoreFiles(Collections.singletonList(sf), false, true,
- false, Long.MAX_VALUE);
+ .getScannersForStoreFiles(Collections.singletonList(sf),
+ false, true, false, false, Long.MAX_VALUE);
StoreFileScanner scanner = scanners.get(0);
seekTestOfReversibleKeyValueScanner(scanner);
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
scanners = StoreFileScanner.getScannersForStoreFiles(
- Collections.singletonList(sf), false, true, false, readPoint);
+ Collections.singletonList(sf), false, true, false, false, readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
}
}
@@ -493,7 +493,7 @@ public class TestReversibleScanners {
throws IOException {
List<StoreFileScanner> fileScanners = StoreFileScanner
.getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
- false, readPoint);
+ false, false, readPoint);
List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
fileScanners.size() + 1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
index 5471ee5..fb123d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
@@ -186,7 +186,7 @@ public class TestStripeCompactor {
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
- anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+ anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
return new StripeCompactor(conf, store) {
@@ -217,6 +217,7 @@ public class TestStripeCompactor {
.thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r);
+ when(sf.createReader(anyBoolean())).thenReturn(r);
return new CompactionRequest(Arrays.asList(sf));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 8c5c67a..33c28a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -723,6 +723,7 @@ public class TestStripeCompactionPolicy {
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
+ when(sf.createReader(anyBoolean())).thenReturn(r);
when(sf.createReader()).thenReturn(r);
return sf;
}
@@ -746,7 +747,7 @@ public class TestStripeCompactionPolicy {
when(store.getRegionInfo()).thenReturn(info);
when(
store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
- anyBoolean(), anyBoolean())).thenAnswer(writers);
+ anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create();
final Scanner scanner = new Scanner();