You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/25 01:58:51 UTC
[09/11] hbase git commit: HBASE-18825 Use HStoreFile instead of
StoreFile in our own code base and remove unnecessary methods in StoreFile
interface
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index daad241..de41087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
+import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@@ -43,6 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -171,7 +175,7 @@ public class HStore implements Store {
private ScanInfo scanInfo;
// TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
- final List<StoreFile> filesCompacting = Lists.newArrayList();
+ final List<HStoreFile> filesCompacting = Lists.newArrayList();
// All access must be synchronized.
private final Set<ChangedReadersObserver> changedReaderObservers =
@@ -335,7 +339,7 @@ public class HStore implements Store {
* @param kvComparator KVComparator for storeFileManager.
* @return StoreEngine to use.
*/
- protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+ protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
CellComparator kvComparator) throws IOException {
return StoreEngine.create(store, conf, comparator);
}
@@ -517,12 +521,12 @@ public class HStore implements Store {
* from the given directory.
* @throws IOException
*/
- private List<StoreFile> loadStoreFiles() throws IOException {
+ private List<HStoreFile> loadStoreFiles() throws IOException {
Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
return openStoreFiles(files);
}
- private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
+ private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
if (files == null || files.isEmpty()) {
return new ArrayList<>();
}
@@ -530,28 +534,21 @@ public class HStore implements Store {
ThreadPoolExecutor storeFileOpenerThreadPool =
this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
this.getColumnFamilyName());
- CompletionService<StoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool);
+ CompletionService<HStoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool);
int totalValidStoreFile = 0;
- for (final StoreFileInfo storeFileInfo: files) {
+ for (StoreFileInfo storeFileInfo : files) {
// open each store file in parallel
- completionService.submit(new Callable<StoreFile>() {
- @Override
- public StoreFile call() throws IOException {
- StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
- return storeFile;
- }
- });
+ completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo));
totalValidStoreFile++;
}
- ArrayList<StoreFile> results = new ArrayList<>(files.size());
+ ArrayList<HStoreFile> results = new ArrayList<>(files.size());
IOException ioe = null;
try {
for (int i = 0; i < totalValidStoreFile; i++) {
try {
- Future<StoreFile> future = completionService.take();
- StoreFile storeFile = future.get();
+ HStoreFile storeFile = completionService.take().get();
if (storeFile != null) {
long length = storeFile.getReader().length();
this.storeSize += length;
@@ -574,9 +571,9 @@ public class HStore implements Store {
// close StoreFile readers
boolean evictOnClose =
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
- for (StoreFile file : results) {
+ for (HStoreFile file : results) {
try {
- if (file != null) file.closeReader(evictOnClose);
+ if (file != null) file.closeStoreFile(evictOnClose);
} catch (IOException e) {
LOG.warn(e.getMessage());
}
@@ -618,19 +615,18 @@ public class HStore implements Store {
*/
private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
StoreFileManager sfm = storeEngine.getStoreFileManager();
- Collection<StoreFile> currentFiles = sfm.getStorefiles();
- Collection<StoreFile> compactedFiles = sfm.getCompactedfiles();
+ Collection<HStoreFile> currentFiles = sfm.getStorefiles();
+ Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles();
if (currentFiles == null) currentFiles = Collections.emptySet();
if (newFiles == null) newFiles = Collections.emptySet();
if (compactedFiles == null) compactedFiles = Collections.emptySet();
- HashMap<StoreFileInfo, StoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
- for (StoreFile sf : currentFiles) {
+ HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
+ for (HStoreFile sf : currentFiles) {
currentFilesSet.put(sf.getFileInfo(), sf);
}
- HashMap<StoreFileInfo, StoreFile> compactedFilesSet =
- new HashMap<StoreFileInfo, StoreFile>(compactedFiles.size());
- for (StoreFile sf : compactedFiles) {
+ HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
+ for (HStoreFile sf : compactedFiles) {
compactedFilesSet.put(sf.getFileInfo(), sf);
}
@@ -647,13 +643,13 @@ public class HStore implements Store {
LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
+ " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
- Set<StoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
+ Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
for (StoreFileInfo sfi : toBeRemovedFiles) {
toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
}
// try to open the files
- List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
+ List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
// propogate the file changes to the underlying store file manager
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
@@ -668,14 +664,14 @@ public class HStore implements Store {
completeCompaction(toBeRemovedStoreFiles);
}
- private StoreFile createStoreFileAndReader(final Path p) throws IOException {
+ private HStoreFile createStoreFileAndReader(final Path p) throws IOException {
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
return createStoreFileAndReader(info);
}
- private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException {
+ private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
- StoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
+ HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
this.family.getBloomFilterType(), isPrimaryReplicaStore());
storeFile.initReader();
return storeFile;
@@ -734,12 +730,12 @@ public class HStore implements Store {
* @return All store files.
*/
@Override
- public Collection<StoreFile> getStorefiles() {
+ public Collection<HStoreFile> getStorefiles() {
return this.storeEngine.getStoreFileManager().getStorefiles();
}
@Override
- public Collection<StoreFile> getCompactedFiles() {
+ public Collection<HStoreFile> getCompactedFiles() {
return this.storeEngine.getStoreFileManager().getCompactedfiles();
}
@@ -756,19 +752,19 @@ public class HStore implements Store {
isPrimaryReplicaStore(), conf);
reader.loadFileInfo();
- byte[] firstKey = reader.getFirstRowKey();
- Preconditions.checkState(firstKey != null, "First key can not be null");
- Cell lk = reader.getLastKey();
- Preconditions.checkState(lk != null, "Last key can not be null");
- byte[] lastKey = CellUtil.cloneRow(lk);
+ Optional<byte[]> firstKey = reader.getFirstRowKey();
+ Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
+ Optional<Cell> lk = reader.getLastKey();
+ Preconditions.checkState(lk.isPresent(), "Last key can not be null");
+ byte[] lastKey = CellUtil.cloneRow(lk.get());
- LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
+ LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) +
" last=" + Bytes.toStringBinary(lastKey));
LOG.debug("Region bounds: first=" +
Bytes.toStringBinary(getRegionInfo().getStartKey()) +
" last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
- if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
+ if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
throw new WrongRegionException(
"Bulk load file " + srcPath.toString() + " does not fit inside region "
+ this.getRegionInfo().getRegionNameAsString());
@@ -842,7 +838,7 @@ public class HStore implements Store {
LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
+ dstPath + " - updating store file list.");
- StoreFile sf = createStoreFileAndReader(dstPath);
+ HStoreFile sf = createStoreFileAndReader(dstPath);
bulkLoadHFile(sf);
LOG.info("Successfully loaded store file " + srcPath + " into store " + this
@@ -852,11 +848,11 @@ public class HStore implements Store {
}
public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
- StoreFile sf = createStoreFileAndReader(fileInfo);
+ HStoreFile sf = createStoreFileAndReader(fileInfo);
bulkLoadHFile(sf);
}
- private void bulkLoadHFile(StoreFile sf) throws IOException {
+ private void bulkLoadHFile(HStoreFile sf) throws IOException {
StoreFileReader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@@ -883,13 +879,13 @@ public class HStore implements Store {
}
@Override
- public ImmutableCollection<StoreFile> close() throws IOException {
+ public ImmutableCollection<HStoreFile> close() throws IOException {
this.archiveLock.lock();
this.lock.writeLock().lock();
try {
// Clear so metrics doesn't find them.
- ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
- Collection<StoreFile> compactedfiles =
+ ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
+ Collection<HStoreFile> compactedfiles =
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
if (compactedfiles != null && !compactedfiles.isEmpty()) {
@@ -904,13 +900,13 @@ public class HStore implements Store {
// close each store file in parallel
CompletionService<Void> completionService =
new ExecutorCompletionService<>(storeFileCloserThreadPool);
- for (final StoreFile f : result) {
+ for (HStoreFile f : result) {
completionService.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
boolean evictOnClose =
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
- f.closeReader(evictOnClose);
+ f.closeStoreFile(evictOnClose);
return null;
}
});
@@ -1012,20 +1008,20 @@ public class HStore implements Store {
throw lastException;
}
- /*
+ /**
* @param path The pathname of the tmp file into which the store was flushed
* @param logCacheFlushId
* @param status
- * @return StoreFile created.
+ * @return store file created.
* @throws IOException
*/
- private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
+ private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status)
throws IOException {
// Write-out finished successfully, move into the right spot
Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
status.setStatus("Flushing " + this + ": reopening flushed file");
- StoreFile sf = createStoreFileAndReader(dstPath);
+ HStoreFile sf = createStoreFileAndReader(dstPath);
StoreFileReader r = sf.getReader();
this.storeSize += r.length();
@@ -1041,35 +1037,32 @@ public class HStore implements Store {
@Override
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
- boolean isCompaction, boolean includeMVCCReadpoint,
- boolean includesTag)
- throws IOException {
+ boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag) throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
- includesTag, false);
+ includesTag, false);
}
- /*
+ /**
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
- * @param includesMVCCReadPoint - whether to include MVCC or not
+ * @param includeMVCCReadpoint - whether to include MVCC or not
* @param includesTag - includesTag or not
* @return Writer for a new StoreFile in the tmp dir.
*/
@Override
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
- boolean shouldDropBehind)
- throws IOException {
+ boolean shouldDropBehind) throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
- includesTag, shouldDropBehind, null);
+ includesTag, shouldDropBehind, null);
}
- /*
+ /**
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
- * @param includesMVCCReadPoint - whether to include MVCC or not
+ * @param includeMVCCReadpoint - whether to include MVCC or not
* @param includesTag - includesTag or not
* @return Writer for a new StoreFile in the tmp dir.
*/
@@ -1078,8 +1071,7 @@ public class HStore implements Store {
@Override
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
- boolean shouldDropBehind, final TimeRangeTracker trt)
- throws IOException {
+ boolean shouldDropBehind, final TimeRangeTracker trt) throws IOException {
final CacheConfig writerCacheConf;
if (isCompaction) {
// Don't cache data on write on compactions.
@@ -1133,15 +1125,18 @@ public class HStore implements Store {
}
- /*
+ private long getTotalSize(Collection<HStoreFile> sfs) {
+ return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
+ }
+
+ /**
* Change storeFiles adding into place the Reader produced by this new flush.
* @param sfs Store files
* @param snapshotId
* @throws IOException
* @return Whether compaction is required.
*/
- private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
- throws IOException {
+ private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException {
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
@@ -1159,10 +1154,7 @@ public class HStore implements Store {
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {
- long totalSize = 0;
- for (StoreFile sf : sfs) {
- totalSize += sf.getReader().length();
- }
+ long totalSize = getTotalSize(sfs);
String traceMessage = "FLUSH time,count,size,store size,store files ["
+ EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
+ "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
@@ -1171,11 +1163,11 @@ public class HStore implements Store {
return needsCompaction();
}
- /*
+ /**
* Notify all observers that set of Readers has changed.
* @throws IOException
*/
- private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
+ private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
for (ChangedReadersObserver o : this.changedReaderObservers) {
List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock();
@@ -1190,13 +1182,39 @@ public class HStore implements Store {
/**
* Get all scanners with no filtering based on TTL (that happens further down the line).
+ * @param cacheBlocks cache the blocks or not
+ * @param usePread true to use pread, false if not
+ * @param isCompaction true if the scanner is created for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the start row
+ * @param stopRow the stop row
+ * @param readPt the read point of the current scan
+ * @return all scanners for this store
+ */
+ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread,
+ boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)
+ throws IOException {
+ return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false,
+ readPt);
+ }
+
+ /**
+ * Get all scanners with no filtering based on TTL (that happens further down the line).
+ * @param cacheBlocks cache the blocks or not
+ * @param usePread true to use pread, false if not
+ * @param isCompaction true if the scanner is created for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the start row
+ * @param includeStartRow true to include start row, false if not
+ * @param stopRow the stop row
+ * @param includeStopRow true to include stop row, false if not
+ * @param readPt the read point of the current scan
* @return all scanners for this store
*/
- @Override
public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
byte[] stopRow, boolean includeStopRow, long readPt) throws IOException {
- Collection<StoreFile> storeFilesToScan;
+ Collection<HStoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock();
try {
@@ -1221,8 +1239,45 @@ public class HStore implements Store {
return scanners;
}
- @Override
- public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
+ /**
+ * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
+ * (that happens further down the line).
+ * @param files the list of files on which the scanners has to be created
+ * @param cacheBlocks cache the blocks or not
+ * @param usePread true to use pread, false if not
+ * @param isCompaction true if the scanner is created for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the start row
+ * @param stopRow the stop row
+ * @param readPt the read point of the current scan
+ * @param includeMemstoreScanner true if memstore has to be included
+ * @return scanners on the given files and on the memstore if specified
+ */
+ public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
+ boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+ byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner)
+ throws IOException {
+ return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow,
+ false, readPt, includeMemstoreScanner);
+ }
+
+ /**
+ * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
+ * (that happens further down the line).
+ * @param files the list of files on which the scanners has to be created
+ * @param cacheBlocks ache the blocks or not
+ * @param usePread true to use pread, false if not
+ * @param isCompaction true if the scanner is created for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the start row
+ * @param includeStartRow true to include start row, false if not
+ * @param stopRow the stop row
+ * @param includeStopRow true to include stop row, false if not
+ * @param readPt the read point of the current scan
+ * @param includeMemstoreScanner true if memstore has to be included
+ * @return scanners on the given files and on the memstore if specified
+ */
+ public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
@@ -1305,16 +1360,16 @@ public class HStore implements Store {
* @return Storefile we compacted into or null if we failed or opted out early.
*/
@Override
- public List<StoreFile> compact(CompactionContext compaction,
+ public List<HStoreFile> compact(CompactionContext compaction,
ThroughputController throughputController) throws IOException {
return compact(compaction, throughputController, null);
}
@Override
- public List<StoreFile> compact(CompactionContext compaction,
+ public List<HStoreFile> compact(CompactionContext compaction,
ThroughputController throughputController, User user) throws IOException {
assert compaction != null;
- List<StoreFile> sfs = null;
+ List<HStoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();
try {
// Do all sanity checking in here if we have a valid CompactionRequest
@@ -1322,7 +1377,7 @@ public class HStore implements Store {
// block below
long compactionStartTime = EnvironmentEdgeManager.currentTime();
assert compaction.hasSelection();
- Collection<StoreFile> filesToCompact = cr.getFiles();
+ Collection<HStoreFile> filesToCompact = cr.getFiles();
assert !filesToCompact.isEmpty();
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
@@ -1338,7 +1393,6 @@ public class HStore implements Store {
// Commence the compaction.
List<Path> newFiles = compaction.compact(throughputController, user);
- long outputBytes = 0L;
// TODO: get rid of this!
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
LOG.warn("hbase.hstore.compaction.complete is set to false");
@@ -1347,8 +1401,8 @@ public class HStore implements Store {
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
for (Path newFile : newFiles) {
// Create storefile around what we wrote with a reader on it.
- StoreFile sf = createStoreFileAndReader(newFile);
- sf.closeReader(evictOnClose);
+ HStoreFile sf = createStoreFileAndReader(newFile);
+ sf.closeStoreFile(evictOnClose);
sfs.add(sf);
}
return sfs;
@@ -1364,10 +1418,7 @@ public class HStore implements Store {
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
compactedCellsSize += getCompactionProgress().totalCompactedSize;
}
-
- for (StoreFile sf : sfs) {
- outputBytes += sf.getReader().length();
- }
+ long outputBytes = getTotalSize(sfs);
// At this point the store will use new files for all new scanners.
completeCompaction(filesToCompact); // update store size.
@@ -1387,12 +1438,12 @@ public class HStore implements Store {
}
}
- private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
+ private List<HStoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
User user) throws IOException {
- List<StoreFile> sfs = new ArrayList<>(newFiles.size());
+ List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) {
assert newFile != null;
- StoreFile sf = moveFileIntoPlace(newFile);
+ HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user);
}
@@ -1403,7 +1454,7 @@ public class HStore implements Store {
}
// Package-visible for tests
- StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
+ HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
validateStoreFile(newFile);
// Move the file into the right spot
Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
@@ -1415,17 +1466,15 @@ public class HStore implements Store {
* @param filesCompacted Files compacted (input).
* @param newFiles Files from compaction.
*/
- private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
- Collection<StoreFile> newFiles) throws IOException {
- if (region.getWAL() == null) return;
- List<Path> inputPaths = new ArrayList<>(filesCompacted.size());
- for (StoreFile f : filesCompacted) {
- inputPaths.add(f.getPath());
- }
- List<Path> outputPaths = new ArrayList<>(newFiles.size());
- for (StoreFile f : newFiles) {
- outputPaths.add(f.getPath());
+ private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
+ Collection<HStoreFile> newFiles) throws IOException {
+ if (region.getWAL() == null) {
+ return;
}
+ List<Path> inputPaths =
+ filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList());
+ List<Path> outputPaths =
+ newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
HRegionInfo info = this.region.getRegionInfo();
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
@@ -1437,8 +1486,8 @@ public class HStore implements Store {
}
@VisibleForTesting
- void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
- final Collection<StoreFile> result) throws IOException {
+ void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
+ throws IOException {
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
@@ -1455,7 +1504,7 @@ public class HStore implements Store {
* @param compactionStartTime Start time.
*/
private void logCompactionEndMessage(
- CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) {
+ CompactionRequest cr, List<HStoreFile> sfs, long now, long compactionStartTime) {
StringBuilder message = new StringBuilder(
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
@@ -1463,7 +1512,7 @@ public class HStore implements Store {
if (sfs.isEmpty()) {
message.append("none, ");
} else {
- for (StoreFile sf: sfs) {
+ for (HStoreFile sf: sfs) {
message.append(sf.getPath().getName());
message.append("(size=");
message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
@@ -1479,10 +1528,7 @@ public class HStore implements Store {
LOG.info(message.toString());
if (LOG.isTraceEnabled()) {
int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
- long resultSize = 0;
- for (StoreFile sf : sfs) {
- resultSize += sf.getReader().length();
- }
+ long resultSize = getTotalSize(sfs);
String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
+ "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
+ cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
@@ -1496,9 +1542,8 @@ public class HStore implements Store {
* See HBASE-2231.
* @param compaction
*/
- public void replayCompactionMarker(CompactionDescriptor compaction,
- boolean pickCompactionFiles, boolean removeFiles)
- throws IOException {
+ public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
+ boolean removeFiles) throws IOException {
LOG.debug("Completing compaction from the WAL marker");
List<String> compactionInputs = compaction.getCompactionInputList();
List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
@@ -1525,23 +1570,23 @@ public class HStore implements Store {
}
//some of the input files might already be deleted
- List<StoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());
- for (StoreFile sf : this.getStorefiles()) {
+ List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());
+ for (HStoreFile sf : this.getStorefiles()) {
if (inputFiles.contains(sf.getPath().getName())) {
inputStoreFiles.add(sf);
}
}
// check whether we need to pick up the new files
- List<StoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());
+ List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());
if (pickCompactionFiles) {
- for (StoreFile sf : this.getStorefiles()) {
+ for (HStoreFile sf : this.getStorefiles()) {
compactionOutputs.remove(sf.getPath().getName());
}
for (String compactionOutput : compactionOutputs) {
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
- StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+ HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
outputStoreFiles.add(storeFile);
}
}
@@ -1561,8 +1606,9 @@ public class HStore implements Store {
* but instead makes a compaction candidate list by itself.
* @param N Number of files.
*/
+ @VisibleForTesting
public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
- List<StoreFile> filesToCompact;
+ List<HStoreFile> filesToCompact;
boolean isMajor;
this.lock.readLock().lock();
@@ -1572,7 +1618,7 @@ public class HStore implements Store {
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
- StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+ HStoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = filesToCompact.indexOf(last);
Preconditions.checkArgument(idx != -1);
filesToCompact.subList(0, idx + 1).clear();
@@ -1598,11 +1644,11 @@ public class HStore implements Store {
.compactForTesting(filesToCompact, isMajor);
for (Path newFile: newFiles) {
// Move the compaction into place.
- StoreFile sf = moveFileIntoPlace(newFile);
+ HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompact(this, sf, null, null);
}
- replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
+ replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
completeCompaction(filesToCompact);
}
} finally {
@@ -1624,7 +1670,7 @@ public class HStore implements Store {
@Override
public boolean isMajorCompaction() throws IOException {
- for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
+ for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
// TODO: what are these reader checks all over the place?
if (sf.getReader() == null) {
LOG.debug("StoreFile " + sf + " has null Reader");
@@ -1652,7 +1698,7 @@ public class HStore implements Store {
synchronized (filesCompacting) {
// First, see if coprocessor would want to override selection.
if (this.getCoprocessorHost() != null) {
- final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
+ final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false;
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
tracker, user);
@@ -1688,7 +1734,7 @@ public class HStore implements Store {
}
// Finally, we have the resulting files list. Check if we have any files at all.
request = compaction.getRequest();
- Collection<StoreFile> selectedFiles = request.getFiles();
+ Collection<HStoreFile> selectedFiles = request.getFiles();
if (selectedFiles.isEmpty()) {
return Optional.empty();
}
@@ -1716,7 +1762,7 @@ public class HStore implements Store {
}
/** Adds the files to compacting files. filesCompacting must be locked. */
- private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
+ private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
if (filesToAdd == null) return;
// Check that we do not try to compact the same StoreFile twice.
if (!Collections.disjoint(filesCompacting, filesToAdd)) {
@@ -1734,7 +1780,7 @@ public class HStore implements Store {
return;
}
this.lock.readLock().lock();
- Collection<StoreFile> delSfs = null;
+ Collection<HStoreFile> delSfs = null;
try {
synchronized (filesCompacting) {
long cfTtl = getStoreFileTtl();
@@ -1749,7 +1795,7 @@ public class HStore implements Store {
}
if (delSfs == null || delSfs.isEmpty()) return;
- Collection<StoreFile> newFiles = new ArrayList<>(); // No new files.
+ Collection<HStoreFile> newFiles = new ArrayList<>(); // No new files.
writeCompactionWalRecord(delSfs, newFiles);
replaceStoreFiles(delSfs, newFiles);
completeCompaction(delSfs);
@@ -1775,23 +1821,20 @@ public class HStore implements Store {
}
/**
- * Validates a store file by opening and closing it. In HFileV2 this should
- * not be an expensive operation.
- *
+ * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
+ * operation.
* @param path the path to the store file
*/
- private void validateStoreFile(Path path)
- throws IOException {
- StoreFile storeFile = null;
+ private void validateStoreFile(Path path) throws IOException {
+ HStoreFile storeFile = null;
try {
storeFile = createStoreFileAndReader(path);
} catch (IOException e) {
- LOG.error("Failed to open store file : " + path
- + ", keeping it in tmp location", e);
+ LOG.error("Failed to open store file : " + path + ", keeping it in tmp location", e);
throw e;
} finally {
if (storeFile != null) {
- storeFile.closeReader(false);
+ storeFile.closeStoreFile(false);
}
}
}
@@ -1811,11 +1854,11 @@ public class HStore implements Store {
* @param compactedFiles list of files that were compacted
*/
@VisibleForTesting
- protected void completeCompaction(final Collection<StoreFile> compactedFiles)
+ protected void completeCompaction(Collection<HStoreFile> compactedFiles)
throws IOException {
this.storeSize = 0L;
this.totalUncompressedBytes = 0L;
- for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
+ for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
StoreFileReader r = hsf.getReader();
if (r == null) {
LOG.warn("StoreFile " + hsf + " has a null Reader");
@@ -1857,7 +1900,7 @@ public class HStore implements Store {
}
@Override
- public byte[] getSplitPoint() {
+ public Optional<byte[]> getSplitPoint() {
this.lock.readLock().lock();
try {
// Should already be enforced by the split policy!
@@ -1867,7 +1910,7 @@ public class HStore implements Store {
if (LOG.isTraceEnabled()) {
LOG.trace("Not splittable; has references: " + this);
}
- return null;
+ return Optional.empty();
}
return this.storeEngine.getStoreFileManager().getSplitPoint();
} catch(IOException e) {
@@ -1875,7 +1918,7 @@ public class HStore implements Store {
} finally {
this.lock.readLock().unlock();
}
- return null;
+ return Optional.empty();
}
@Override
@@ -1924,24 +1967,39 @@ public class HStore implements Store {
return scanner;
}
- @Override
+ /**
+ * Recreates the scanners on the current list of active store file scanners
+ * @param currentFileScanners the current set of active store file scanners
+ * @param cacheBlocks cache the blocks or not
+ * @param usePread use pread or not
+ * @param isCompaction is the scanner for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the scan's start row
+ * @param includeStartRow should the scan include the start row
+ * @param stopRow the scan's stop row
+ * @param includeStopRow should the scan include the stop row
+ * @param readPt the read point of the current scane
+ * @param includeMemstoreScanner whether the current scanner should include memstorescanner
+ * @return list of scanners recreated on the current Scanners
+ * @throws IOException
+ */
public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
this.lock.readLock().lock();
try {
- Map<String, StoreFile> name2File =
+ Map<String, HStoreFile> name2File =
new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
- for (StoreFile file : getStorefiles()) {
+ for (HStoreFile file : getStorefiles()) {
name2File.put(file.getFileInfo().getActiveFileName(), file);
}
if (getCompactedFiles() != null) {
- for (StoreFile file : getCompactedFiles()) {
+ for (HStoreFile file : getCompactedFiles()) {
name2File.put(file.getFileInfo().getActiveFileName(), file);
}
}
- List<StoreFile> filesToReopen = new ArrayList<>();
+ List<HStoreFile> filesToReopen = new ArrayList<>();
for (KeyValueScanner kvs : currentFileScanners) {
assert kvs.isFileScanner();
if (kvs.peek() == null) {
@@ -1974,87 +2032,45 @@ public class HStore implements Store {
return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
}
+ private LongStream getStoreFileCreatedTimestampStream() {
+ return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
+ if (sf.getReader() == null) {
+ LOG.warn("StoreFile " + sf + " has a null Reader");
+ return false;
+ } else {
+ return true;
+ }
+ }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp());
+ }
+
@Override
public long getMaxStoreFileAge() {
- long earliestTS = Long.MAX_VALUE;
- for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
- StoreFileReader r = s.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + s + " has a null Reader");
- continue;
- }
- if (!s.isHFile()) {
- continue;
- }
- long createdTS = s.getFileInfo().getCreatedTimestamp();
- earliestTS = (createdTS < earliestTS) ? createdTS : earliestTS;
- }
- long now = EnvironmentEdgeManager.currentTime();
- return now - earliestTS;
+ return EnvironmentEdgeManager.currentTime() -
+ getStoreFileCreatedTimestampStream().min().orElse(Long.MAX_VALUE);
}
@Override
public long getMinStoreFileAge() {
- long latestTS = 0;
- for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
- StoreFileReader r = s.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + s + " has a null Reader");
- continue;
- }
- if (!s.isHFile()) {
- continue;
- }
- long createdTS = s.getFileInfo().getCreatedTimestamp();
- latestTS = (createdTS > latestTS) ? createdTS : latestTS;
- }
- long now = EnvironmentEdgeManager.currentTime();
- return now - latestTS;
+ return EnvironmentEdgeManager.currentTime() -
+ getStoreFileCreatedTimestampStream().max().orElse(0L);
}
@Override
public long getAvgStoreFileAge() {
- long sum = 0, count = 0;
- for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
- StoreFileReader r = s.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + s + " has a null Reader");
- continue;
- }
- if (!s.isHFile()) {
- continue;
- }
- sum += s.getFileInfo().getCreatedTimestamp();
- count++;
- }
- if (count == 0) {
- return 0;
- }
- long avgTS = sum / count;
- long now = EnvironmentEdgeManager.currentTime();
- return now - avgTS;
+ OptionalDouble avg = getStoreFileCreatedTimestampStream().average();
+ return avg.isPresent() ? EnvironmentEdgeManager.currentTime() - (long) avg.getAsDouble() : 0L;
}
@Override
public long getNumReferenceFiles() {
- long numRefFiles = 0;
- for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
- if (s.isReference()) {
- numRefFiles++;
- }
- }
- return numRefFiles;
+ return this.storeEngine.getStoreFileManager().getStorefiles().stream()
+ .filter(HStoreFile::isReference).count();
}
@Override
public long getNumHFiles() {
- long numHFiles = 0;
- for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
- if (s.isHFile()) {
- numHFiles++;
- }
- }
- return numHFiles;
+ return this.storeEngine.getStoreFileManager().getStorefiles().stream()
+ .filter(HStoreFile::isHFile).count();
}
@Override
@@ -2074,59 +2090,41 @@ public class HStore implements Store {
return getStorefilesSize(storeFile -> storeFile.isHFile());
}
- private long getStorefilesSize(Predicate<StoreFile> predicate) {
- long size = 0;
- for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
- StoreFileReader r = s.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + s + " has a null Reader");
- continue;
+ private long getStorefilesSize(Predicate<HStoreFile> predicate) {
+ return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
+ if (sf.getReader() == null) {
+ LOG.warn("StoreFile " + sf + " has a null Reader");
+ return false;
+ } else {
+ return true;
}
- if (predicate.test(s)) {
- size += r.length();
+ }).filter(predicate).mapToLong(sf -> sf.getReader().length()).sum();
+ }
+
+ private long getStoreFileFieldSize(ToLongFunction<StoreFileReader> f) {
+ return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
+ if (sf.getReader() == null) {
+ LOG.warn("StoreFile " + sf + " has a null Reader");
+ return false;
+ } else {
+ return true;
}
- }
- return size;
+ }).map(HStoreFile::getReader).mapToLong(f).sum();
}
@Override
public long getStorefilesIndexSize() {
- long size = 0;
- for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
- StoreFileReader r = s.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + s + " has a null Reader");
- continue;
- }
- size += r.indexSize();
- }
- return size;
+ return getStoreFileFieldSize(StoreFileReader::indexSize);
}
@Override
public long getTotalStaticIndexSize() {
- long size = 0;
- for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
- StoreFileReader r = s.getReader();
- if (r == null) {
- continue;
- }
- size += r.getUncompressedDataIndexSize();
- }
- return size;
+ return getStoreFileFieldSize(StoreFileReader::getUncompressedDataIndexSize);
}
@Override
public long getTotalStaticBloomSize() {
- long size = 0;
- for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
- StoreFileReader r = s.getReader();
- if (r == null) {
- continue;
- }
- size += r.getTotalBloomSize();
- }
- return size;
+ return getStoreFileFieldSize(StoreFileReader::getTotalBloomSize);
}
@Override
@@ -2247,19 +2245,19 @@ public class HStore implements Store {
if (this.tempFiles == null || this.tempFiles.isEmpty()) {
return false;
}
- List<StoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
+ List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
for (Path storeFilePath : tempFiles) {
try {
- StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
+ HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
outputFileSize += sf.getReader().length();
storeFiles.add(sf);
} catch (IOException ex) {
LOG.error("Failed to commit store file " + storeFilePath, ex);
// Try to delete the files we have committed before.
- for (StoreFile sf : storeFiles) {
+ for (HStoreFile sf : storeFiles) {
Path pathToDelete = sf.getPath();
try {
- sf.deleteReader();
+ sf.deleteStoreFile();
} catch (IOException deleteEx) {
LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
Runtime.getRuntime().halt(1);
@@ -2269,7 +2267,7 @@ public class HStore implements Store {
}
}
- for (StoreFile sf : storeFiles) {
+ for (HStoreFile sf : storeFiles) {
if (HStore.this.getCoprocessorHost() != null) {
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
}
@@ -2305,11 +2303,11 @@ public class HStore implements Store {
@Override
public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
throws IOException {
- List<StoreFile> storeFiles = new ArrayList<>(fileNames.size());
+ List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
for (String file : fileNames) {
// open the file as a store file (hfile link, etc)
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
- StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+ HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
storeFiles.add(storeFile);
HStore.this.storeSize += storeFile.getReader().length();
HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
@@ -2498,9 +2496,9 @@ public class HStore implements Store {
archiveLock.lock();
try {
lock.readLock().lock();
- Collection<StoreFile> copyCompactedfiles = null;
+ Collection<HStoreFile> copyCompactedfiles = null;
try {
- Collection<StoreFile> compactedfiles =
+ Collection<HStoreFile> compactedfiles =
this.getStoreEngine().getStoreFileManager().getCompactedfiles();
if (compactedfiles != null && compactedfiles.size() != 0) {
// Do a copy under read lock
@@ -2527,10 +2525,10 @@ public class HStore implements Store {
* @param compactedfiles The compacted files in this store that are not active in reads
* @throws IOException
*/
- private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
+ private void removeCompactedfiles(Collection<HStoreFile> compactedfiles)
throws IOException {
- final List<StoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
- for (final StoreFile file : compactedfiles) {
+ final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
+ for (final HStoreFile file : compactedfiles) {
synchronized (file) {
try {
StoreFileReader r = file.getReader();
@@ -2573,7 +2571,7 @@ public class HStore implements Store {
// files which were successfully archived. Otherwise we will receive a
// FileNotFoundException when we attempt to re-archive them in the next go around.
Collection<Path> failedFiles = fae.getFailedFiles();
- Iterator<StoreFile> iter = filesToRemove.iterator();
+ Iterator<HStoreFile> iter = filesToRemove.iterator();
while (iter.hasNext()) {
if (failedFiles.contains(iter.next().getPath())) {
iter.remove();
@@ -2601,7 +2599,7 @@ public class HStore implements Store {
return this.memstore.isSloppy();
}
- private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
+ private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index c43b788..a79af13 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Map;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,14 +32,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@@ -61,8 +62,50 @@ public class HStoreFile implements StoreFile {
private static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
+ public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
+
private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
+ // Keys for fileinfo values in HFile
+
+ /** Max Sequence ID in FileInfo */
+ public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
+
+ /** Major compaction flag in FileInfo */
+ public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
+
+ /** Minor compaction flag in FileInfo */
+ public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
+ Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
+
+ /** Bloom filter Type in FileInfo */
+ public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
+
+ /** Delete Family Count in FileInfo */
+ public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
+
+ /** Last Bloom filter key in FileInfo */
+ public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
+
+ /** Key for Timerange information in metadata */
+ public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
+
+ /** Key for timestamp of earliest-put in metadata */
+ public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
+
+ /** Key for the number of mob cells in metadata */
+ public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
+
+ /** Meta key set when store file is a result of a bulk load */
+ public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
+ public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
+
+ /**
+ * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
+ * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
+ */
+ public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
+
private final StoreFileInfo fileInfo;
private final FileSystem fs;
@@ -90,29 +133,28 @@ public class HStoreFile implements StoreFile {
private long maxMemstoreTS = -1;
// firstKey, lastkey and cellComparator will be set when openReader.
- private Cell firstKey;
+ private Optional<Cell> firstKey;
- private Cell lastKey;
+ private Optional<Cell> lastKey;
- private Comparator<Cell> comparator;
+ private CellComparator comparator;
- @Override
public CacheConfig getCacheConf() {
return cacheConf;
}
@Override
- public Cell getFirstKey() {
+ public Optional<Cell> getFirstKey() {
return firstKey;
}
@Override
- public Cell getLastKey() {
+ public Optional<Cell> getLastKey() {
return lastKey;
}
@Override
- public Comparator<Cell> getComparator() {
+ public CellComparator getComparator() {
return comparator;
}
@@ -155,27 +197,6 @@ public class HStoreFile implements StoreFile {
* configuration. This may or may not be the same as the Bloom filter type actually
* present in the HFile, because column family configuration might change. If this is
* {@link BloomType#NONE}, the existing Bloom filter is ignored.
- * @deprecated Now we will specific whether the StoreFile is for primary replica when
- * constructing, so please use {@link #HStoreFile(FileSystem, Path, Configuration,
- * CacheConfig, BloomType, boolean)} directly.
- */
- @Deprecated
- public HStoreFile(final FileSystem fs, final Path p, final Configuration conf,
- final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
- this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
- }
-
- /**
- * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
- * depending on the underlying files (10-20MB?).
- * @param fs The current file system to use.
- * @param p The path of the file.
- * @param conf The current configuration.
- * @param cacheConf The cache configuration and block cache reference.
- * @param cfBloomType The bloom type to use for this store file as specified by column family
- * configuration. This may or may not be the same as the Bloom filter type actually
- * present in the HFile, because column family configuration might change. If this is
- * {@link BloomType#NONE}, the existing Bloom filter is ignored.
* @param primaryReplica true if this is a store file for primary replica, otherwise false.
* @throws IOException
*/
@@ -187,27 +208,6 @@ public class HStoreFile implements StoreFile {
/**
* Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
* depending on the underlying files (10-20MB?).
- * @param fs The current file system to use.
- * @param fileInfo The store file information.
- * @param conf The current configuration.
- * @param cacheConf The cache configuration and block cache reference.
- * @param cfBloomType The bloom type to use for this store file as specified by column family
- * configuration. This may or may not be the same as the Bloom filter type actually
- * present in the HFile, because column family configuration might change. If this is
- * {@link BloomType#NONE}, the existing Bloom filter is ignored.
- * @deprecated Now we will specific whether the StoreFile is for primary replica when
- * constructing, so please use {@link #HStoreFile(FileSystem, StoreFileInfo,
- * Configuration, CacheConfig, BloomType, boolean)} directly.
- */
- @Deprecated
- public HStoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
- final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
- this(fs, fileInfo, conf, cacheConf, cfBloomType, true);
- }
-
- /**
- * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
- * depending on the underlying files (10-20MB?).
* @param fs fs The current file system to use.
* @param fileInfo The store file information.
* @param conf The current configuration.
@@ -235,7 +235,10 @@ public class HStoreFile implements StoreFile {
this.primaryReplica = primaryReplica;
}
- @Override
+ /**
+ * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
+ * reference.
+ */
public StoreFileInfo getFileInfo() {
return this.fileInfo;
}
@@ -283,7 +286,11 @@ public class HStoreFile implements StoreFile {
return fileInfo.getModificationTime();
}
- @Override
+ /**
+ * Only used by the Striped Compaction Policy
+ * @param key
+ * @return value associated with the metadata key
+ */
public byte[] getMetadataValue(byte[] key) {
return metadataMap.get(key);
}
@@ -299,7 +306,6 @@ public class HStoreFile implements StoreFile {
return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
}
- @Override
public boolean isCompactedAway() {
return compactedAway;
}
@@ -309,7 +315,9 @@ public class HStoreFile implements StoreFile {
return refCount.get();
}
- @Override
+ /**
+ * @return true if the file is still used in reads
+ */
public boolean isReferencedInReads() {
int rc = refCount.get();
assert rc >= 0; // we should not go negative.
@@ -331,7 +339,7 @@ public class HStoreFile implements StoreFile {
/**
* Opens reader on this store file. Called by Constructor.
* @throws IOException
- * @see #closeReader(boolean)
+ * @see #closeStoreFile(boolean)
*/
private void open() throws IOException {
if (this.reader != null) {
@@ -440,7 +448,9 @@ public class HStoreFile implements StoreFile {
comparator = reader.getComparator();
}
- @Override
+ /**
+ * Initialize the reader used for pread.
+ */
public void initReader() throws IOException {
if (reader == null) {
try {
@@ -448,7 +458,7 @@ public class HStoreFile implements StoreFile {
} catch (Exception e) {
try {
boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
- this.closeReader(evictOnClose);
+ this.closeStoreFile(evictOnClose);
} catch (IOException ee) {
LOG.warn("failed to close reader", ee);
}
@@ -465,14 +475,22 @@ public class HStoreFile implements StoreFile {
return reader;
}
- @Override
+ /**
+ * Get a scanner which uses pread.
+ * <p>
+ * Must be called after initReader.
+ */
public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) {
return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
- @Override
+ /**
+ * Get a scanner which uses streaming read.
+ * <p>
+ * Must be called after initReader.
+ */
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
throws IOException {
@@ -480,31 +498,37 @@ public class HStoreFile implements StoreFile {
isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
}
- @Override
+ /**
+ * @return Current reader. Must call initReader first else returns null.
+ * @see #initReader()
+ */
public StoreFileReader getReader() {
return this.reader;
}
- @Override
- public synchronized void closeReader(boolean evictOnClose)
- throws IOException {
+ /**
+ * @param evictOnClose whether to evict blocks belonging to this file
+ * @throws IOException
+ */
+ public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
if (this.reader != null) {
this.reader.close(evictOnClose);
this.reader = null;
}
}
- @Override
- public void markCompactedAway() {
- this.compactedAway = true;
+ /**
+ * Delete this file
+ * @throws IOException
+ */
+ public void deleteStoreFile() throws IOException {
+ boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
+ closeStoreFile(evictOnClose);
+ this.fs.delete(getPath(), true);
}
- @Override
- public void deleteReader() throws IOException {
- boolean evictOnClose =
- cacheConf != null? cacheConf.shouldEvictOnClose(): true;
- closeReader(evictOnClose);
- this.fs.delete(getPath(), true);
+ public void markCompactedAway() {
+ this.compactedAway = true;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 4528517..8af33b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -246,30 +246,27 @@ public class MemStoreCompactor {
MemStoreSegmentsIterator iterator = null;
switch (action) {
- case COMPACT:
- iterator =
- new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
- compactingMemStore.getComparator(),
- compactionKVMax, compactingMemStore.getStore());
+ case COMPACT:
+ iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
+ compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore());
- result = SegmentFactory.instance().createImmutableSegmentByCompaction(
+ result = SegmentFactory.instance().createImmutableSegmentByCompaction(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), compactingMemStore.getIndexType());
- iterator.close();
- break;
- case MERGE:
- iterator =
- new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
- compactingMemStore.getComparator(),
- compactionKVMax);
+ iterator.close();
+ break;
+ case MERGE:
+ iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
+ compactingMemStore.getComparator(), compactionKVMax);
- result = SegmentFactory.instance().createImmutableSegmentByMerge(
+ result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), versionedList.getStoreSegments(),
compactingMemStore.getIndexType());
- iterator.close();
- break;
- default: throw new RuntimeException("Unknown action " + action); // sanity check
+ iterator.close();
+ break;
+ default:
+ throw new RuntimeException("Unknown action " + action); // sanity check
}
return result;
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
index 3d88955..b3ba998 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -45,10 +45,8 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
private StoreScanner compactingScanner;
// C-tor
- public MemStoreCompactorSegmentsIterator(
- List<ImmutableSegment> segments,
- CellComparator comparator, int compactionKVMax, Store store
- ) throws IOException {
+ public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
+ CellComparator comparator, int compactionKVMax, HStore store) throws IOException {
super(compactionKVMax);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
@@ -108,7 +106,7 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
* Creates the scanner for compacting the pipeline.
* @return the scanner
*/
- private StoreScanner createScanner(Store store, List<KeyValueScanner> scanners)
+ private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners)
throws IOException {
// Get all available versions
return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners,
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
index 9bdeedc..b9f9af8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@ -23,9 +23,9 @@ import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into
@@ -39,7 +39,7 @@ public class MobStoreScanner extends StoreScanner {
private boolean readEmptyValueOnMobCellMiss = false;
private final HMobStore mobStore;
- public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+ public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
final NavigableSet<byte[]> columns, long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 044c4dc..fe0f30e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -499,7 +499,7 @@ public class RegionCoprocessorHost
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, long)}
*/
- public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
+ public InternalScanner preCompactScannerOpen(HStore store, List<StoreFileScanner> scanners,
ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user,
long readPoint) throws IOException {
return execOperationWithResult(null,
@@ -514,7 +514,7 @@ public class RegionCoprocessorHost
}
/**
- * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
+ * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
* available candidates.
* @param store The store where compaction is being requested
* @param candidates The currently available store files
@@ -522,7 +522,7 @@ public class RegionCoprocessorHost
* @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException
*/
- public boolean preCompactSelection(Store store, List<StoreFile> candidates,
+ public boolean preCompactSelection(HStore store, List<HStoreFile> candidates,
CompactionLifeCycleTracker tracker, User user) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
@Override
@@ -534,13 +534,13 @@ public class RegionCoprocessorHost
}
/**
- * Called after the {@link StoreFile}s to be compacted have been selected from the available
+ * Called after the {@link HStoreFile}s to be compacted have been selected from the available
* candidates.
* @param store The store where compaction is being requested
* @param selected The store files selected to compact
* @param tracker used to track the life cycle of a compaction
*/
- public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
+ public void postCompactSelection(HStore store, ImmutableList<HStoreFile> selected,
CompactionLifeCycleTracker tracker, User user) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
@Override
@@ -559,7 +559,7 @@ public class RegionCoprocessorHost
* @param tracker used to track the life cycle of a compaction
* @throws IOException
*/
- public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
+ public InternalScanner preCompact(HStore store, InternalScanner scanner, ScanType scanType,
CompactionLifeCycleTracker tracker, User user) throws IOException {
return execOperationWithResult(false, scanner,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
@@ -578,7 +578,7 @@ public class RegionCoprocessorHost
* @param tracker used to track the life cycle of a compaction
* @throws IOException
*/
- public void postCompact(Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
+ public void postCompact(HStore store, HStoreFile resultFile, CompactionLifeCycleTracker tracker,
User user) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
@Override
@@ -593,7 +593,7 @@ public class RegionCoprocessorHost
* Invoked before a memstore flush
* @throws IOException
*/
- public InternalScanner preFlush(final Store store, final InternalScanner scanner)
+ public InternalScanner preFlush(HStore store, final InternalScanner scanner)
throws IOException {
return execOperationWithResult(false, scanner,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
@@ -623,16 +623,16 @@ public class RegionCoprocessorHost
* See
* {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
*/
- public InternalScanner preFlushScannerOpen(final Store store,
- final List<KeyValueScanner> scanners, final long readPoint) throws IOException {
+ public InternalScanner preFlushScannerOpen(HStore store, List<KeyValueScanner> scanners,
+ long readPoint) throws IOException {
return execOperationWithResult(null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint));
- }
- });
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint));
+ }
+ });
}
/**
@@ -653,7 +653,7 @@ public class RegionCoprocessorHost
* Invoked after a memstore flush
* @throws IOException
*/
- public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
+ public void postFlush(HStore store, HStoreFile storeFile) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
@@ -1136,16 +1136,16 @@ public class RegionCoprocessorHost
* See
* {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)}
*/
- public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
- final NavigableSet<byte[]> targetCols, final long readPt) throws IOException {
+ public KeyValueScanner preStoreScannerOpen(HStore store, Scan scan,
+ NavigableSet<byte[]> targetCols, long readPt) throws IOException {
return execOperationWithResult(null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt));
- }
- });
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt));
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
index 5ccd6e3..71b7b9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
@@ -19,14 +19,15 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
@@ -80,12 +81,12 @@ public abstract class RegionSplitPolicy extends Configured {
byte[] splitPointFromLargestStore = null;
long largestStoreSize = 0;
- for (Store s : stores) {
- byte[] splitPoint = s.getSplitPoint();
+ for (HStore s : stores) {
+ Optional<byte[]> splitPoint = s.getSplitPoint();
// Store also returns null if it has references as way of indicating it is not splittable
long storeSize = s.getSize();
- if (splitPoint != null && largestStoreSize < storeSize) {
- splitPointFromLargestStore = splitPoint;
+ if (splitPoint.isPresent() && largestStoreSize < storeSize) {
+ splitPointFromLargestStore = splitPoint.get();
largestStoreSize = storeSize;
}
}
@@ -131,7 +132,7 @@ public abstract class RegionSplitPolicy extends Configured {
/**
* In {@link HRegionFileSystem#splitStoreFile(org.apache.hadoop.hbase.HRegionInfo, String,
- * StoreFile, byte[], boolean, RegionSplitPolicy)} we are not creating the split reference
+ * HStoreFile, byte[], boolean, RegionSplitPolicy)} we are not creating the split reference
* if split row not lies in the StoreFile range. But in some use cases we may need to create
* the split reference even when the split row not lies in the range. This method can be used
* to decide, whether to skip the the StoreFile range check or not.
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
index bfe20ba..d64c372 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@ -23,9 +23,9 @@ import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
@@ -40,7 +40,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
private boolean readEmptyValueOnMobCellMiss = false;
protected final HMobStore mobStore;
- ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+ ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index 04e77e9..0089d3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* ReversedStoreScanner extends from StoreScanner, and is used to support
@@ -46,7 +46,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
* @param columns which columns we are scanning
* @throws IOException
*/
- ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+ ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
NavigableSet<byte[]> columns, long readPt)
throws IOException {
super(store, scanInfo, scan, columns, readPt);