You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/08/13 04:47:20 UTC
hbase git commit: HBASE-14098 Allow dropping caches behind compactions
Repository: hbase
Updated Branches:
refs/heads/branch-1.2 222102196 -> 34b706af4
HBASE-14098 Allow dropping caches behind compactions
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/34b706af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/34b706af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/34b706af
Branch: refs/heads/branch-1.2
Commit: 34b706af4d44ad7dff8ac5f35eec304d7dc0ccab
Parents: 2221021
Author: Elliott Clark <ec...@apache.org>
Authored: Wed Aug 12 14:32:48 2015 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Wed Aug 12 19:43:16 2015 -0700
----------------------------------------------------------------------
.../hbase/io/FSDataInputStreamWrapper.java | 22 +++++++++++++----
.../hadoop/hbase/io/hfile/CacheConfig.java | 19 +++++++++++----
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 12 ++++++++++
.../hbase/regionserver/DefaultStoreFlusher.java | 7 ++++--
.../hadoop/hbase/regionserver/HStore.java | 22 ++++++++++++-----
.../apache/hadoop/hbase/regionserver/Store.java | 25 ++++++++++++++++++--
.../hadoop/hbase/regionserver/StoreFile.java | 24 ++++++++++++++-----
.../hbase/regionserver/StoreFileInfo.java | 11 +++++----
.../hbase/regionserver/StoreFileScanner.java | 11 +++++----
.../hbase/regionserver/StripeStoreFlusher.java | 6 ++++-
.../regionserver/compactions/Compactor.java | 10 ++++++--
.../compactions/DefaultCompactor.java | 16 +++++++++----
.../compactions/StripeCompactor.java | 8 ++++---
.../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 3 ++-
.../hbase/regionserver/TestFSErrorsExposed.java | 2 +-
.../regionserver/TestReversibleScanners.java | 8 +++----
.../hbase/regionserver/TestStripeCompactor.java | 3 ++-
.../compactions/TestStripeCompactionPolicy.java | 3 ++-
18 files changed, 160 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 5950585..b06be6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.FileLink;
import com.google.common.annotations.VisibleForTesting;
@@ -76,14 +75,23 @@ public class FSDataInputStreamWrapper {
private volatile int hbaseChecksumOffCount = -1;
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
- this(fs, null, path);
+ this(fs, null, path, false);
+ }
+
+ public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
+ this(fs, null, path, dropBehind);
}
public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
- this(fs, link, null);
+ this(fs, link, null, false);
+ }
+ public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+ boolean dropBehind) throws IOException {
+ this(fs, link, null, dropBehind);
}
- private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException {
+ private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+ Path path, boolean dropBehind) throws IOException {
assert (path == null) != (link == null);
this.path = path;
this.link = link;
@@ -96,8 +104,14 @@ public class FSDataInputStreamWrapper {
// Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+ try {
+ this.stream.setDropBehind(dropBehind);
+ } catch (Exception e) {
+ // Skipped.
+ }
}
+
/**
* Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
* reads finish and before any other reads start (what happens in reality is we read the
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 26eb1da..ee2d001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -131,6 +131,8 @@ public class CacheConfig {
private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false;
private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class";
+ private static final String DROP_BEHIND_CACHE_COMPACTION_KEY="hbase.hfile.drop.behind.compaction";
+ private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
/**
* Enum of all built in external block caches.
@@ -194,6 +196,8 @@ public class CacheConfig {
*/
private boolean cacheDataInL1;
+ private final boolean dropBehindCompaction;
+
/**
* Create a cache configuration using the specified configuration object and
* family descriptor.
@@ -218,7 +222,8 @@ public class CacheConfig {
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
DEFAULT_PREFETCH_ON_OPEN) || family.isPrefetchBlocksOnOpen(),
conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
- HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1()
+ HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1(),
+ conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
);
}
@@ -239,7 +244,8 @@ public class CacheConfig {
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN),
conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
- HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1)
+ HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1),
+ conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
);
}
@@ -264,7 +270,7 @@ public class CacheConfig {
final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
final boolean cacheBloomsOnWrite, final boolean evictOnClose,
final boolean cacheDataCompressed, final boolean prefetchOnOpen,
- final boolean cacheDataInL1) {
+ final boolean cacheDataInL1, final boolean dropBehindCompaction) {
this.blockCache = blockCache;
this.cacheDataOnRead = cacheDataOnRead;
this.inMemory = inMemory;
@@ -275,6 +281,7 @@ public class CacheConfig {
this.cacheDataCompressed = cacheDataCompressed;
this.prefetchOnOpen = prefetchOnOpen;
this.cacheDataInL1 = cacheDataInL1;
+ this.dropBehindCompaction = dropBehindCompaction;
LOG.info(this);
}
@@ -287,7 +294,7 @@ public class CacheConfig {
cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen,
- cacheConf.cacheDataInL1);
+ cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction);
}
/**
@@ -314,6 +321,10 @@ public class CacheConfig {
return isBlockCacheEnabled() && cacheDataOnRead;
}
+ public boolean shouldDropBehindCompaction() {
+ return dropBehindCompaction;
+ }
+
/**
* Should we cache a block of a particular category? We always cache
* important blocks such as index blocks, as long as the block cache is
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 0b06c33..6741957 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -248,6 +248,7 @@ public class HFile {
protected KVComparator comparator = KeyValue.COMPARATOR;
protected InetSocketAddress[] favoredNodes;
private HFileContext fileContext;
+ protected boolean shouldDropBehind = false;
WriterFactory(Configuration conf, CacheConfig cacheConf) {
this.conf = conf;
@@ -285,6 +286,12 @@ public class HFile {
return this;
}
+ public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) {
+ this.shouldDropBehind = shouldDropBehind;
+ return this;
+ }
+
+
public Writer create() throws IOException {
if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
throw new AssertionError("Please specify exactly one of " +
@@ -292,6 +299,11 @@ public class HFile {
}
if (path != null) {
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
+ try {
+ ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction());
+ } catch (UnsupportedOperationException uoe) {
+ LOG.debug("Unable to set drop behind on " + path, uoe);
+ }
}
return createWriter(fs, path, ostream,
comparator, fileContext);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 474a44a..da89129 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -63,8 +63,11 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
- writer = store.createWriterInTmp(
- cellsCount, store.getFamily().getCompression(), false, true, snapshot.isTagsPresent());
+ writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
+ /* isCompaction = */ false,
+ /* includeMVCCReadpoint = */ true,
+ /* includesTags = */ snapshot.isTagsPresent(),
+ /* shouldDropBehind = */ false);
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
IOException e = null;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 0c6b2f0..e15db38 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -966,6 +966,15 @@ public class HStore implements Store {
return sf;
}
+ @Override
+ public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
+ boolean isCompaction, boolean includeMVCCReadpoint,
+ boolean includesTag)
+ throws IOException {
+ return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
+ includesTag, false);
+ }
+
/*
* @param maxKeyCount
* @param compression Compression algorithm to use
@@ -976,7 +985,8 @@ public class HStore implements Store {
*/
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
- boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
+ boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
+ boolean shouldDropBehind)
throws IOException {
final CacheConfig writerCacheConf;
if (isCompaction) {
@@ -1001,6 +1011,7 @@ public class HStore implements Store {
.withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
+ .withShouldDropCacheBehind(shouldDropBehind)
.build();
return w;
}
@@ -1102,9 +1113,8 @@ public class HStore implements Store {
// TODO this used to get the store files in descending order,
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
- List<StoreFileScanner> sfScanners = StoreFileScanner
- .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
- readPt);
+ List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
+ cacheBlocks, usePread, isCompaction, false, matcher, readPt);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
@@ -1176,7 +1186,7 @@ public class HStore implements Store {
CompactionThroughputController throughputController) throws IOException {
assert compaction != null;
List<StoreFile> sfs = null;
- CompactionRequest cr = compaction.getRequest();;
+ CompactionRequest cr = compaction.getRequest();
try {
// Do all sanity checking in here if we have a valid CompactionRequest
// because we need to clean up after it on the way out in a finally
@@ -2151,7 +2161,7 @@ public class HStore implements Store {
return new StoreFlusherImpl(cacheFlushId);
}
- private class StoreFlusherImpl implements StoreFlushContext {
+ private final class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushSeqNum;
private MemStoreSnapshot snapshot;
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index da2cb10..5a13ba8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -159,11 +159,28 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
FileSystem getFileSystem();
- /*
+
+ /**
+ * @param maxKeyCount
+ * @param compression Compression algorithm to use
+ * @param isCompaction whether we are creating a new file in a compaction
+ * @param includeMVCCReadpoint whether we should out the MVCC readpoint
+ * @return Writer for a new StoreFile in the tmp dir.
+ */
+ StoreFile.Writer createWriterInTmp(
+ long maxKeyCount,
+ Compression.Algorithm compression,
+ boolean isCompaction,
+ boolean includeMVCCReadpoint,
+ boolean includesTags
+ ) throws IOException;
+
+ /**
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
+ * @param shouldDropBehind should the writer drop caches behind writes
* @return Writer for a new StoreFile in the tmp dir.
*/
StoreFile.Writer createWriterInTmp(
@@ -171,9 +188,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
Compression.Algorithm compression,
boolean isCompaction,
boolean includeMVCCReadpoint,
- boolean includesTags
+ boolean includesTags,
+ boolean shouldDropBehind
) throws IOException;
+
+
+
// Compaction oriented methods
boolean throttleCompaction(long compactionSize);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index dd86a5d..acd4233 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -247,8 +247,8 @@ public class StoreFile {
}
/**
- * @return True if this is a StoreFile Reference; call after {@link #open()}
- * else may get wrong answer.
+ * @return True if this is a StoreFile Reference; call
+ * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
*/
public boolean isReference() {
return this.fileInfo.isReference();
@@ -366,13 +366,13 @@ public class StoreFile {
* @throws IOException
* @see #closeReader(boolean)
*/
- private Reader open() throws IOException {
+ private Reader open(boolean canUseDropBehind) throws IOException {
if (this.reader != null) {
throw new IllegalAccessError("Already open");
}
// Open the StoreFile.Reader
- this.reader = fileInfo.open(this.fs, this.cacheConf);
+ this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@@ -462,14 +462,18 @@ public class StoreFile {
return this.reader;
}
+ public Reader createReader() throws IOException {
+ return createReader(false);
+ }
+
/**
* @return Reader for StoreFile. creates if necessary
* @throws IOException
*/
- public Reader createReader() throws IOException {
+ public Reader createReader(boolean canUseDropBehind) throws IOException {
if (this.reader == null) {
try {
- this.reader = open();
+ this.reader = open(canUseDropBehind);
} catch (IOException e) {
try {
this.closeReader(true);
@@ -546,6 +550,8 @@ public class StoreFile {
private Path filePath;
private InetSocketAddress[] favoredNodes;
private HFileContext fileContext;
+ private boolean shouldDropCacheBehind = false;
+
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
this.conf = conf;
@@ -611,6 +617,11 @@ public class StoreFile {
this.fileContext = fileContext;
return this;
}
+
+ public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
+ this.shouldDropCacheBehind = shouldDropCacheBehind;
+ return this;
+ }
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@@ -1253,6 +1264,7 @@ public class StoreFile {
case ROWCOL:
key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
colOffset, colLen);
+
break;
default:
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 6516a3e..54f200f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -218,21 +218,24 @@ public class StoreFileInfo {
* @return The StoreFile.Reader for the file
*/
public StoreFile.Reader open(final FileSystem fs,
- final CacheConfig cacheConf) throws IOException {
+ final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
FSDataInputStreamWrapper in;
FileStatus status;
+ final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
if (this.link != null) {
// HFileLink
- in = new FSDataInputStreamWrapper(fs, this.link);
+ in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
status = this.link.getFileStatus(fs);
} else if (this.reference != null) {
// HFile Reference
Path referencePath = getReferredToFile(this.getPath());
- in = new FSDataInputStreamWrapper(fs, referencePath);
+ in = new FSDataInputStreamWrapper(fs, referencePath,
+ doDropBehind);
status = fs.getFileStatus(referencePath);
} else {
- in = new FSDataInputStreamWrapper(fs, this.getPath());
+ in = new FSDataInputStreamWrapper(fs, this.getPath(),
+ doDropBehind);
status = fs.getFileStatus(initialPath);
}
long length = status.getLen();
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 961352d..dc22931 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -90,7 +90,7 @@ public class StoreFileScanner implements KeyValueScanner {
boolean cacheBlocks,
boolean usePread, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks,
- usePread, false, readPt);
+ usePread, false, false, readPt);
}
/**
@@ -98,9 +98,9 @@ public class StoreFileScanner implements KeyValueScanner {
*/
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
- boolean isCompaction, long readPt) throws IOException {
+ boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
- null, readPt);
+ useDropBehind, null, readPt);
}
/**
@@ -110,11 +110,12 @@ public class StoreFileScanner implements KeyValueScanner {
*/
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
- boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
+ boolean isCompaction, boolean canUseDrop,
+ ScanQueryMatcher matcher, long readPt) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size());
for (StoreFile file : files) {
- StoreFile.Reader r = file.createReader();
+ StoreFile.Reader r = file.createReader(canUseDrop);
StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
isCompaction, readPt);
scanner.setScanQueryMatcher(matcher);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 136934c..37e7402 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -109,7 +109,11 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override
public Writer createWriter() throws IOException {
StoreFile.Writer writer = store.createWriterInTmp(
- kvCount, store.getFamily().getCompression(), false, true, true);
+ kvCount, store.getFamily().getCompression(),
+ /* isCompaction = */ false,
+ /* includeMVCCReadpoint = */ true,
+ /* includesTags = */ true,
+ /* shouldDropBehind = */ false);
writer.setTimeRangeTracker(tracker);
return writer;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 2c34c70..a515b87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -185,8 +185,14 @@ public abstract class Compactor {
* @return Scanners.
*/
protected List<StoreFileScanner> createFileScanners(
- final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
- return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
+ final Collection<StoreFile> filesToCompact,
+ long smallestReadPoint,
+ boolean useDropBehind) throws IOException {
+ return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
+ /* cache blocks = */ false,
+ /* use pread = */ false,
+ /* is compaction */ true,
+ /* use Drop Behind */ useDropBehind,
smallestReadPoint);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index bc8dd01..ed441d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -60,17 +60,19 @@ public class DefaultCompactor extends Compactor {
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
- if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) {
+ if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFileFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(new StoreFile(f));
}
- scanners = createFileScanners(readersToClose, smallestReadPoint);
+ scanners = createFileScanners(readersToClose, smallestReadPoint,
+ store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
- scanners = createFileScanners(request.getFiles(), smallestReadPoint);
+ scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+ store.throttleCompaction(request.getSize()));
}
StoreFile.Writer writer = null;
@@ -81,6 +83,7 @@ public class DefaultCompactor extends Compactor {
InternalScanner scanner = null;
try {
/* Include deletes, unless we are doing a compaction of all files */
+
ScanType scanType =
request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
@@ -102,14 +105,17 @@ public class DefaultCompactor extends Compactor {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
- fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0);
+ fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
+
boolean finished =
performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
+
+
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
writer = null;
- throw new InterruptedIOException( "Aborting compaction of store " + store +
+ throw new InterruptedIOException("Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 10e3cf0..f11c259 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -81,7 +81,7 @@ public class StripeCompactor extends Compactor {
throughputController);
}
- private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request,
+ private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
@@ -89,7 +89,8 @@ public class StripeCompactor extends Compactor {
this.progress = new CompactionProgress(fd.maxKeyCount);
long smallestReadPoint = getSmallestReadPoint();
- List<StoreFileScanner> scanners = createFileScanners(filesToCompact, smallestReadPoint);
+ List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
+ smallestReadPoint, store.throttleCompaction(request.getSize()));
boolean finished = false;
InternalScanner scanner = null;
@@ -124,7 +125,8 @@ public class StripeCompactor extends Compactor {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(
- fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
+ fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
+ store.throttleCompaction(request.getSize()));
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index a025e6c..d2bfa7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -252,7 +252,8 @@ public class TestCacheOnWrite {
cacheConf =
new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
cowType.shouldBeCached(BlockType.LEAF_INDEX),
- cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false);
+ cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData,
+ false, false, false);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
index 4e97738..b84f0d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
@@ -142,7 +142,7 @@ public class TestFSErrorsExposed {
cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
- Collections.singletonList(sf), false, true, false,
+ Collections.singletonList(sf), false, true, false, false,
// 0 is passed as readpoint because this test operates on StoreFile directly
0);
KeyValueScanner scanner = scanners.get(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index eecacbe..9ed5d97 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -104,14 +104,14 @@ public class TestReversibleScanners {
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner
- .getScannersForStoreFiles(Collections.singletonList(sf), false, true,
- false, Long.MAX_VALUE);
+ .getScannersForStoreFiles(Collections.singletonList(sf),
+ false, true, false, false, Long.MAX_VALUE);
StoreFileScanner scanner = scanners.get(0);
seekTestOfReversibleKeyValueScanner(scanner);
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
scanners = StoreFileScanner.getScannersForStoreFiles(
- Collections.singletonList(sf), false, true, false, readPoint);
+ Collections.singletonList(sf), false, true, false, false, readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
}
}
@@ -493,7 +493,7 @@ public class TestReversibleScanners {
throws IOException {
List<StoreFileScanner> fileScanners = StoreFileScanner
.getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
- false, readPoint);
+ false, false, readPoint);
List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
fileScanners.size() + 1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
index 50ee131..110eade 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
@@ -195,7 +195,7 @@ public class TestStripeCompactor {
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
- anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+ anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
return new StripeCompactor(conf, store) {
@@ -226,6 +226,7 @@ public class TestStripeCompactor {
.thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r);
+ when(sf.createReader(anyBoolean())).thenReturn(r);
return new CompactionRequest(Arrays.asList(sf));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 5e62af1..a11bd70 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -724,6 +724,7 @@ public class TestStripeCompactionPolicy {
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
+ when(sf.createReader(anyBoolean())).thenReturn(r);
when(sf.createReader()).thenReturn(r);
return sf;
}
@@ -747,7 +748,7 @@ public class TestStripeCompactionPolicy {
when(store.getRegionInfo()).thenReturn(info);
when(
store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
- anyBoolean(), anyBoolean())).thenAnswer(writers);
+ anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create();
final Scanner scanner = new Scanner();