You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/04/19 01:48:25 UTC
[3/3] hbase git commit: HBASE-17914 Create a new reader instead of
cloning a new StoreFile when compaction
HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66b616d7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66b616d7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66b616d7
Branch: refs/heads/master
Commit: 66b616d7a3d6f4ad6d20962e2dfc0c82a4092ddb
Parents: 719a30b
Author: zhangduo <zh...@apache.org>
Authored: Mon Apr 17 22:53:49 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Apr 19 09:26:33 2017 +0800
----------------------------------------------------------------------
.../hbase/io/FSDataInputStreamWrapper.java | 63 +++---
.../org/apache/hadoop/hbase/io/FileLink.java | 14 +-
.../hadoop/hbase/io/HalfStoreFileReader.java | 13 +-
.../hadoop/hbase/io/hfile/CacheConfig.java | 9 +-
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 85 ++++----
.../hbase/io/hfile/HFilePrettyPrinter.java | 2 +-
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 26 +--
.../hbase/mapreduce/LoadIncrementalHFiles.java | 45 ++--
.../procedure/MergeTableRegionsProcedure.java | 9 +-
.../procedure/SplitTableRegionProcedure.java | 8 +-
.../apache/hadoop/hbase/mob/CachedMobFile.java | 4 +-
.../org/apache/hadoop/hbase/mob/MobFile.java | 8 +-
.../org/apache/hadoop/hbase/mob/MobUtils.java | 13 +-
.../compactions/PartitionedMobCompactor.java | 26 +--
.../regionserver/DefaultStoreFileManager.java | 2 +-
.../hadoop/hbase/regionserver/HMobStore.java | 6 +-
.../hadoop/hbase/regionserver/HRegion.java | 4 +-
.../hbase/regionserver/HRegionFileSystem.java | 6 +-
.../hadoop/hbase/regionserver/HStore.java | 19 +-
.../regionserver/ReversedStoreScanner.java | 2 +-
.../hadoop/hbase/regionserver/StoreFile.java | 216 ++++++++++++-------
.../hbase/regionserver/StoreFileInfo.java | 21 +-
.../hbase/regionserver/StoreFileReader.java | 86 ++++----
.../hbase/regionserver/StoreFileScanner.java | 50 +++--
.../hadoop/hbase/regionserver/StoreScanner.java | 6 +-
.../regionserver/compactions/Compactor.java | 44 +---
.../hadoop/hbase/util/CompressionTest.java | 2 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 6 +-
.../hbase/util/hbck/HFileCorruptionChecker.java | 4 +-
.../hbase/HFilePerformanceEvaluation.java | 2 +-
.../hadoop/hbase/client/TestFromClientSide.java | 1 +
.../hbase/io/TestHalfStoreFileReader.java | 192 ++++++++---------
.../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 2 +-
.../apache/hadoop/hbase/io/hfile/TestHFile.java | 8 +-
.../hbase/io/hfile/TestHFileBlockIndex.java | 6 +-
.../hbase/io/hfile/TestHFileEncryption.java | 6 +-
.../TestHFileInlineToRootChunkConversion.java | 2 +-
.../hadoop/hbase/io/hfile/TestPrefetch.java | 2 +-
.../hadoop/hbase/io/hfile/TestReseekTo.java | 4 +-
.../hfile/TestSeekBeforeWithInlineBlocks.java | 2 +-
.../hadoop/hbase/io/hfile/TestSeekTo.java | 8 +-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 10 +-
.../TestImportTSVWithVisibilityLabels.java | 2 +-
.../hadoop/hbase/mapreduce/TestImportTsv.java | 2 +-
.../mapreduce/TestLoadIncrementalHFiles.java | 4 +-
.../apache/hadoop/hbase/mob/TestMobFile.java | 8 +-
.../hbase/mob/compactions/TestMobCompactor.java | 9 +-
.../TestPartitionedMobCompactor.java | 18 +-
.../regionserver/DataBlockEncodingTool.java | 7 +-
.../EncodedSeekPerformanceTest.java | 12 +-
.../hbase/regionserver/MockStoreFile.java | 25 ++-
.../regionserver/TestCacheOnWriteInSchema.java | 6 +-
.../regionserver/TestCompactionPolicy.java | 3 -
.../regionserver/TestCompoundBloomFilter.java | 7 +-
.../regionserver/TestEncryptionKeyRotation.java | 2 +-
.../TestEncryptionRandomKeying.java | 2 +-
.../hbase/regionserver/TestFSErrorsExposed.java | 12 +-
.../regionserver/TestMobStoreCompaction.java | 7 +-
.../regionserver/TestReversibleScanners.java | 33 ++-
.../hadoop/hbase/regionserver/TestStore.java | 2 +-
.../hbase/regionserver/TestStoreFile.java | 120 ++++++-----
.../TestStoreFileScannerWithTagCompression.java | 10 +-
.../regionserver/compactions/TestCompactor.java | 3 -
.../compactions/TestStripeCompactionPolicy.java | 3 -
.../hbase/util/TestHBaseFsckEncryption.java | 2 +-
.../hadoop/hbase/spark/BulkLoadSuite.scala | 8 +-
66 files changed, 701 insertions(+), 650 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index b06be6b..055e46a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -17,11 +17,14 @@
*/
package org.apache.hadoop.hbase.io;
+import java.io.Closeable;
import java.io.IOException;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem;
import com.google.common.annotations.VisibleForTesting;
@@ -31,11 +34,14 @@ import com.google.common.annotations.VisibleForTesting;
* as well as closing streams. Initialization is not thread-safe, but normal operation is;
* see method comments.
*/
-public class FSDataInputStreamWrapper {
+@InterfaceAudience.Private
+public class FSDataInputStreamWrapper implements Closeable {
private final HFileSystem hfs;
private final Path path;
private final FileLink link;
private final boolean doCloseStreams;
+ private final boolean dropBehind;
+ private final long readahead;
/** Two stream handles, one with and one without FS-level checksum.
* HDFS checksum setting is on FS level, not single read level, so you have to keep two
@@ -75,43 +81,52 @@ public class FSDataInputStreamWrapper {
private volatile int hbaseChecksumOffCount = -1;
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
- this(fs, null, path, false);
+ this(fs, path, false, -1L);
}
- public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
- this(fs, null, path, dropBehind);
+ public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException {
+ this(fs, null, path, dropBehind, readahead);
}
- public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
- this(fs, link, null, false);
- }
public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
- boolean dropBehind) throws IOException {
- this(fs, link, null, dropBehind);
+ boolean dropBehind, long readahead) throws IOException {
+ this(fs, link, null, dropBehind, readahead);
}
- private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
- Path path, boolean dropBehind) throws IOException {
+ private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
+ long readahead) throws IOException {
assert (path == null) != (link == null);
this.path = path;
this.link = link;
this.doCloseStreams = true;
+ this.dropBehind = dropBehind;
+ this.readahead = readahead;
// If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
// that wraps over the specified fs. In this case, we will not be able to avoid
// checksumming inside the filesystem.
- this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
+ this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
// Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+ setStreamOptions(stream);
+ }
+
+ private void setStreamOptions(FSDataInputStream in) {
try {
this.stream.setDropBehind(dropBehind);
} catch (Exception e) {
// Skipped.
}
+ if (readahead >= 0) {
+ try {
+ this.stream.setReadahead(readahead);
+ } catch (Exception e) {
+ // Skipped.
+ }
+ }
}
-
/**
* Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
* reads finish and before any other reads start (what happens in reality is we read the
@@ -127,6 +142,7 @@ public class FSDataInputStreamWrapper {
if (useHBaseChecksum) {
FileSystem fsNc = hfs.getNoChecksumFs();
this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
+ setStreamOptions(streamNoFsChecksum);
this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
// Close the checksum stream; we will reopen it if we get an HBase checksum failure.
this.stream.close();
@@ -150,6 +166,8 @@ public class FSDataInputStreamWrapper {
link = null;
hfs = null;
useHBaseChecksumConfigured = useHBaseChecksum = false;
+ dropBehind = false;
+ readahead = 0;
}
/**
@@ -201,19 +219,14 @@ public class FSDataInputStreamWrapper {
}
/** Close stream(s) if necessary. */
- public void close() throws IOException {
- if (!doCloseStreams) return;
- try {
- if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
- streamNoFsChecksum.close();
- streamNoFsChecksum = null;
- }
- } finally {
- if (stream != null) {
- stream.close();
- stream = null;
- }
+ @Override
+ public void close() {
+ if (!doCloseStreams) {
+ return;
}
+ // we do not care about the close exception as it is for reading, no data loss issue.
+ IOUtils.closeQuietly(streamNoFsChecksum);
+ IOUtils.closeQuietly(stream);
}
public HFileSystem getHfs() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index ca0dfbc..8a79efb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -29,6 +29,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
@@ -99,7 +101,7 @@ public class FileLink {
* and the alternative locations, when the file is moved.
*/
private static class FileLinkInputStream extends InputStream
- implements Seekable, PositionedReadable {
+ implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead {
private FSDataInputStream in = null;
private Path currentPath = null;
private long pos = 0;
@@ -306,6 +308,16 @@ public class FileLink {
}
throw new FileNotFoundException("Unable to open link: " + fileLink);
}
+
+ @Override
+ public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
+ in.setReadahead(readahead);
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
+ in.setDropBehind(dropCache);
+ }
}
private Path[] locations = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index a4a281e..c4dbc39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -72,10 +73,10 @@ public class HalfStoreFileReader extends StoreFileReader {
* @param conf Configuration
* @throws IOException
*/
- public HalfStoreFileReader(final FileSystem fs, final Path p,
- final CacheConfig cacheConf, final Reference r, final Configuration conf)
+ public HalfStoreFileReader(FileSystem fs, Path p, CacheConfig cacheConf, Reference r,
+ boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
throws IOException {
- super(fs, p, cacheConf, conf);
+ super(fs, p, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't
@@ -99,9 +100,9 @@ public class HalfStoreFileReader extends StoreFileReader {
* @throws IOException
*/
public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in,
- long size, final CacheConfig cacheConf, final Reference r, final Configuration conf)
- throws IOException {
- super(fs, p, in, size, cacheConf, conf);
+ long size, final CacheConfig cacheConf, final Reference r, boolean isPrimaryReplicaStoreFile,
+ AtomicInteger refCount, boolean shared, final Configuration conf) throws IOException {
+ super(fs, p, in, size, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 4db60b5..791445b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -283,11 +283,10 @@ public class CacheConfig {
}
/**
- * Create a block cache configuration with the specified cache and
- * configuration parameters.
+ * Create a block cache configuration with the specified cache and configuration parameters.
* @param blockCache reference to block cache, null if completely disabled
* @param cacheDataOnRead whether DATA blocks should be cached on read (we always cache INDEX
- * blocks and BLOOM blocks; this cannot be disabled).
+ * blocks and BLOOM blocks; this cannot be disabled).
* @param inMemory whether blocks should be flagged as in-memory
* @param cacheDataOnWrite whether data blocks should be cached on write
* @param cacheIndexesOnWrite whether index blocks should be cached on write
@@ -296,7 +295,9 @@ public class CacheConfig {
* @param cacheDataCompressed whether to store blocks as compressed in the cache
* @param prefetchOnOpen whether to prefetch blocks upon open
* @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families
- * data blocks up in the L1 tier.
+ * data blocks up in the L1 tier.
+ * @param dropBehindCompaction indicate that we should set drop behind to true when open a store
+ * file reader for compaction
*/
CacheConfig(final BlockCache blockCache,
final boolean cacheDataOnRead, final boolean inMemory,
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index c5b334a..0887ee8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -36,6 +36,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -462,8 +463,6 @@ public class HFile {
boolean isPrimaryReplicaReader();
- void setPrimaryReplicaReader(boolean isPrimaryReplicaReader);
-
boolean shouldIncludeMemstoreTS();
boolean isDecodeMemstoreTS();
@@ -486,33 +485,32 @@ public class HFile {
* @param size max size of the trailer.
* @param cacheConf Cache configuation values, cannot be null.
* @param hfs
+ * @param primaryReplicaReader true if this is a reader for primary replica
* @return an appropriate instance of HFileReader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
justification="Intentional")
- private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
- long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
+ private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, long size,
+ CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, Configuration conf)
+ throws IOException {
FixedFileTrailer trailer = null;
try {
boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
assert !isHBaseChecksum; // Initially we must read with FS checksum.
trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
switch (trailer.getMajorVersion()) {
- case 2:
- LOG.debug("Opening HFile v2 with v3 reader");
- // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
- case 3 :
- return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf);
- default:
- throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
+ case 2:
+ LOG.debug("Opening HFile v2 with v3 reader");
+ // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
+ case 3:
+ return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs,
+ primaryReplicaReader, conf);
+ default:
+ throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
}
} catch (Throwable t) {
- try {
- fsdis.close();
- } catch (Throwable t2) {
- LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
- }
+ IOUtils.closeQuietly(fsdis);
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
}
}
@@ -523,13 +521,13 @@ public class HFile {
* @param fsdis a stream of path's file
* @param size max size of the trailer.
* @param cacheConf Cache configuration for hfile's contents
+ * @param primaryReplicaReader true if this is a reader for primary replica
* @param conf Configuration
* @return A version specific Hfile Reader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/
- @SuppressWarnings("resource")
- public static Reader createReader(FileSystem fs, Path path,
- FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
+ public static Reader createReader(FileSystem fs, Path path, FSDataInputStreamWrapper fsdis,
+ long size, CacheConfig cacheConf, boolean primaryReplicaReader, Configuration conf)
throws IOException {
HFileSystem hfs = null;
@@ -540,9 +538,9 @@ public class HFile {
if (!(fs instanceof HFileSystem)) {
hfs = new HFileSystem(fs);
} else {
- hfs = (HFileSystem)fs;
+ hfs = (HFileSystem) fs;
}
- return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
+ return pickReaderVersion(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf);
}
/**
@@ -553,35 +551,39 @@ public class HFile {
* @throws IOException Will throw a CorruptHFileException
* (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
*/
- public static Reader createReader(
- FileSystem fs, Path path, Configuration conf) throws IOException {
- return createReader(fs, path, CacheConfig.DISABLED, conf);
- }
+ public static Reader createReader(FileSystem fs, Path path, Configuration conf)
+ throws IOException {
+ // The primaryReplicaReader is mainly used for constructing block cache key, so if we do not use
+ // block cache then it is OK to set it as any value. We use true here.
+ return createReader(fs, path, CacheConfig.DISABLED, true, conf);
+ }
/**
- *
* @param fs filesystem
* @param path Path to file to read
- * @param cacheConf This must not be null. @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
+ * @param cacheConf This must not be null. @see
+ * {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
+ * @param primaryReplicaReader true if this is a reader for primary replica
* @return an active Reader instance
- * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
+ * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile
+ * is corrupt/invalid.
*/
- public static Reader createReader(
- FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
+ public static Reader createReader(FileSystem fs, Path path, CacheConfig cacheConf,
+ boolean primaryReplicaReader, Configuration conf) throws IOException {
Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
- return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
- cacheConf, stream.getHfs(), conf);
+ return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
+ stream.getHfs(), primaryReplicaReader, conf);
}
/**
* This factory method is used only by unit tests
*/
- static Reader createReaderFromStream(Path path,
- FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
- throws IOException {
+ @VisibleForTesting
+ static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size,
+ CacheConfig cacheConf, Configuration conf) throws IOException {
FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
- return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
+ return pickReaderVersion(path, wrapper, size, cacheConf, null, true, conf);
}
/**
@@ -606,22 +608,13 @@ public class HFile {
throws IOException {
final Path path = fileStatus.getPath();
final long size = fileStatus.getLen();
- FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
- try {
+ try (FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path)) {
boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
assert !isHBaseChecksum; // Initially we must read with FS checksum.
FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
return true;
} catch (IllegalArgumentException e) {
return false;
- } catch (IOException e) {
- throw e;
- } finally {
- try {
- fsdis.close();
- } catch (Throwable t) {
- LOG.warn("Error closing fsdis FSDataInputStreamWrapper: " + path, t);
- }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 030a25e..43b5c24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -306,7 +306,7 @@ public class HFilePrettyPrinter extends Configured implements Tool {
return -2;
}
- HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), getConf());
+ HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), true, getConf());
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4e8cbaa..f0a1fa1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -85,7 +85,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
/** Filled when we read in the trailer. */
private final Compression.Algorithm compressAlgo;
- private boolean isPrimaryReplicaReader;
+ private final boolean primaryReplicaReader;
/**
* What kind of data block encoding should be used while reading, writing,
@@ -156,6 +156,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
/** Minor versions starting with this number have faked index key */
static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
+ @VisibleForTesting
+ @Deprecated
+ public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
+ long fileSize, CacheConfig cacheConf, HFileSystem hfs, Configuration conf)
+ throws IOException {
+ this(path, trailer, fsdis, fileSize, cacheConf, hfs, true, conf);
+ }
+
/**
* Opens a HFile. You must load the index before you can use it by calling
* {@link #loadFileInfo()}.
@@ -175,11 +183,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* Configuration
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
- public HFileReaderImpl(final Path path, FixedFileTrailer trailer,
- final FSDataInputStreamWrapper fsdis,
- final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
- final Configuration conf)
- throws IOException {
+ public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
+ long fileSize, CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader,
+ Configuration conf) throws IOException {
this.trailer = trailer;
this.compressAlgo = trailer.getCompressionCodec();
this.cacheConf = cacheConf;
@@ -187,6 +193,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
this.path = path;
this.name = path.getName();
this.conf = conf;
+ this.primaryReplicaReader = primaryReplicaReader;
checkFileVersion();
this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
@@ -453,12 +460,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override
public boolean isPrimaryReplicaReader() {
- return isPrimaryReplicaReader;
- }
-
- @Override
- public void setPrimaryReplicaReader(boolean isPrimaryReplicaReader) {
- this.isPrimaryReplicaReader = isPrimaryReplicaReader;
+ return primaryReplicaReader;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 19daeed..3af4290 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.mapreduce;
import static java.lang.String.format;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -27,7 +32,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -63,9 +67,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClientServiceCallable;
@@ -99,10 +100,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Tool to load the output of HFileOutputFormat into an existing table.
*/
@@ -937,8 +934,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
HFile.Reader hfr = null;
try {
- hfr = HFile.createReader(fs, hfilePath,
- new CacheConfig(getConf()), getConf());
+ hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), true, getConf());
} catch (FileNotFoundException fnfe) {
LOG.debug("encountered", fnfe);
return new Pair<>(null, hfilePath.getName());
@@ -1105,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
HalfStoreFileReader halfReader = null;
StoreFileWriter halfWriter = null;
try {
- halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
+ halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
+ new AtomicInteger(0), true, conf);
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
int blocksize = familyDescriptor.getBlocksize();
@@ -1213,30 +1210,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
throws IOException {
Path hfile = hfileStatus.getPath();
- HFile.Reader reader = HFile.createReader(fs, hfile,
- new CacheConfig(getConf()), getConf());
- try {
+ try (HFile.Reader reader =
+ HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
hcd.setCompressionType(reader.getFileContext().getCompression());
- LOG.info("Setting compression " + hcd.getCompressionType().name() +
- " for family " + hcd.toString());
+ LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " +
+ hcd.toString());
}
reader.loadFileInfo();
byte[] first = reader.getFirstRowKey();
- byte[] last = reader.getLastRowKey();
+ byte[] last = reader.getLastRowKey();
- LOG.info("Trying to figure out region boundaries hfile=" + hfile +
- " first=" + Bytes.toStringBinary(first) +
- " last=" + Bytes.toStringBinary(last));
+ LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
+ Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
// To eventually infer start key-end key boundaries
- Integer value = map.containsKey(first)? map.get(first):0;
- map.put(first, value+1);
+ Integer value = map.containsKey(first) ? map.get(first) : 0;
+ map.put(first, value + 1);
- value = map.containsKey(last)? map.get(last):0;
- map.put(last, value-1);
- } finally {
- reader.close();
+ value = map.containsKey(last) ? map.get(last) : 0;
+ map.put(last, value - 1);
}
}
});
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
index 366378a..3600fe0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
@@ -264,7 +264,7 @@ public class MergeTableRegionsProcedure
@Override
protected MergeTableRegionsState getState(final int stateId) {
- return MergeTableRegionsState.valueOf(stateId);
+ return MergeTableRegionsState.forNumber(stateId);
}
@Override
@@ -613,11 +613,8 @@ public class MergeTableRegionsProcedure
final CacheConfig cacheConf = new CacheConfig(conf, hcd);
for (StoreFileInfo storeFileInfo: storeFiles) {
// Create reference file(s) of the region in mergedDir
- regionFs.mergeStoreFile(
- mergedRegionInfo,
- family,
- new StoreFile(
- mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()),
+ regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(),
+ storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true),
mergedDir);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
index 3cd6c66..bf9afd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
@@ -285,7 +285,7 @@ public class SplitTableRegionProcedure
@Override
protected SplitTableRegionState getState(final int stateId) {
- return SplitTableRegionState.valueOf(stateId);
+ return SplitTableRegionState.forNumber(stateId);
}
@Override
@@ -571,9 +571,9 @@ public class SplitTableRegionProcedure
if (storeFiles != null && storeFiles.size() > 0) {
final CacheConfig cacheConf = new CacheConfig(conf, hcd);
for (StoreFileInfo storeFileInfo: storeFiles) {
- StoreFileSplitter sfs = new StoreFileSplitter(regionFs, family.getBytes(),
- new StoreFile(mfs.getFileSystem(), storeFileInfo, conf,
- cacheConf, hcd.getBloomFilterType()));
+ StoreFileSplitter sfs =
+ new StoreFileSplitter(regionFs, family.getBytes(), new StoreFile(mfs.getFileSystem(),
+ storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true));
futures.add(threadPool.submit(sfs));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
index 7c4d6fe..90d1f2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
@@ -44,7 +44,9 @@ public class CachedMobFile extends MobFile implements Comparable<CachedMobFile>
public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
CacheConfig cacheConf) throws IOException {
- StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+ // XXX: primaryReplica is only used for constructing the key of block cache so it is not a
+ // critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
+ StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
return new CachedMobFile(sf);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
index cd4c079..73355e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -118,9 +118,7 @@ public class MobFile {
* @throws IOException
*/
public void open() throws IOException {
- if (sf.getReader() == null) {
- sf.createReader();
- }
+ sf.initReader();
}
/**
@@ -146,7 +144,9 @@ public class MobFile {
*/
public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
throws IOException {
- StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+ // XXX: primaryReplica is only used for constructing the key of block cache so it is not a
+ // critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
+ StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
return new MobFile(sf);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index eb75120..06c5001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -333,7 +333,8 @@ public final class MobUtils {
if (LOG.isDebugEnabled()) {
LOG.debug(fileName + " is an expired file");
}
- filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+ filesToClean
+ .add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true));
}
} catch (Exception e) {
LOG.error("Cannot parse the fileName " + fileName, e);
@@ -372,7 +373,7 @@ public final class MobUtils {
Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
FileSystem fs = mobRootDir.getFileSystem(conf);
- return mobRootDir.makeQualified(fs);
+ return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
/**
@@ -697,7 +698,7 @@ public final class MobUtils {
return null;
}
Path dstPath = new Path(targetPath, sourceFile.getName());
- validateMobFile(conf, fs, sourceFile, cacheConfig);
+ validateMobFile(conf, fs, sourceFile, cacheConfig, true);
String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
LOG.info(msg);
Path parent = dstPath.getParent();
@@ -718,11 +719,11 @@ public final class MobUtils {
* @param cacheConfig The current cache config.
*/
private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
- CacheConfig cacheConfig) throws IOException {
+ CacheConfig cacheConfig, boolean primaryReplica) throws IOException {
StoreFile storeFile = null;
try {
- storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
- storeFile.createReader();
+ storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica);
+ storeFile.initReader();
} catch (IOException e) {
LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
throw e;
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 987fe51..05c7076 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -223,12 +223,9 @@ public class PartitionedMobCompactor extends MobCompactor {
// File in the Del Partition List
// Get delId from the file
- Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
- try {
+ try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
delId.setStartKey(reader.getFirstRowKey());
delId.setEndKey(reader.getLastRowKey());
- } finally {
- reader.close();
}
CompactionDelPartition delPartition = delFilesToCompact.get(delId);
if (delPartition == null) {
@@ -267,12 +264,9 @@ public class PartitionedMobCompactor extends MobCompactor {
if (withDelFiles) {
// get startKey and endKey from the file and update partition
// TODO: is it possible to skip read of most hfiles?
- Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
- try {
+ try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
compactionPartition.setStartKey(reader.getFirstRowKey());
compactionPartition.setEndKey(reader.getLastRowKey());
- } finally {
- reader.close();
}
}
@@ -340,10 +334,11 @@ public class PartitionedMobCompactor extends MobCompactor {
try {
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
for (Path newDelPath : delPartition.listDelFiles()) {
- StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
+ StoreFile sf =
+ new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true);
// pre-create reader of a del file to avoid race condition when opening the reader in each
// partition.
- sf.createReader();
+ sf.initReader();
delPartition.addStoreFile(sf);
totalDelFileCount++;
}
@@ -557,7 +552,7 @@ public class PartitionedMobCompactor extends MobCompactor {
List<StoreFile> filesToCompact = new ArrayList<>();
for (int i = offset; i < batch + offset; i++) {
StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
- BloomType.NONE);
+ BloomType.NONE, true);
filesToCompact.add(sf);
}
filesToCompact.addAll(delFiles);
@@ -739,7 +734,7 @@ public class PartitionedMobCompactor extends MobCompactor {
}
for (int i = offset; i < batch + offset; i++) {
batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
- BloomType.NONE));
+ BloomType.NONE, true));
}
// compact the del files in a batch.
paths.add(compactDelFilesInBatch(request, batchedDelFiles));
@@ -809,8 +804,8 @@ public class PartitionedMobCompactor extends MobCompactor {
*/
private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
throws IOException {
- List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
- false, HConstants.LATEST_TIMESTAMP);
+ List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact,
+ false, true, false, false, HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan();
scan.setMaxVersions(column.getMaxVersions());
long ttl = HStore.determineTTLFromFamily(column);
@@ -893,7 +888,8 @@ public class PartitionedMobCompactor extends MobCompactor {
for (StoreFile sf : storeFiles) {
// the readers will be closed later after the merge.
maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
- byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
+ sf.initReader();
+ byte[] count = sf.getReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
if (count != null) {
maxKeyCount += Bytes.toLong(count);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index c37ae99..da25df5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -135,7 +135,7 @@ class DefaultStoreFileManager implements StoreFileManager {
this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
}
- // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+ // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
// Let a background thread close the actual reader on these compacted files and also
// ensure to evict the blocks from block cache so that they are no longer in
// cache
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index b021430..032e383 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -292,9 +292,9 @@ public class HMobStore extends HStore {
private void validateMobFile(Path path) throws IOException {
StoreFile storeFile = null;
try {
- storeFile =
- new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
- storeFile.createReader();
+ storeFile = new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig,
+ BloomType.NONE, isPrimaryReplicaStore());
+ storeFile.initReader();
} catch (IOException e) {
LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
throw e;
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 78ce608..b21a84d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4160,8 +4160,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
Set<StoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
- fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
- null, null));
+ fakeStoreFiles.add(
+ new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true));
}
getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 144f43b..014427d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -294,7 +294,7 @@ public class HRegionFileSystem {
*/
Path getStoreFilePath(final String familyName, final String fileName) {
Path familyDir = getStoreDir(familyName);
- return new Path(familyDir, fileName).makeQualified(this.fs);
+ return new Path(familyDir, fileName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
/**
@@ -675,9 +675,7 @@ public class HRegionFileSystem {
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
// Check whether the split row lies in the range of the store file
// If it is outside the range, return directly.
- if (f.getReader() == null) {
- f.createReader();
- }
+ f.initReader();
try {
if (top) {
//check if larger than last key.
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a98f89e..cbdaa1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -650,13 +650,11 @@ public class HStore implements Store {
return createStoreFileAndReader(info);
}
- private StoreFile createStoreFileAndReader(final StoreFileInfo info)
- throws IOException {
+ private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException {
info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
- this.family.getBloomFilterType());
- StoreFileReader r = storeFile.createReader();
- r.setReplicaStoreFile(isPrimaryReplicaStore());
+ this.family.getBloomFilterType(), isPrimaryReplicaStore());
+ storeFile.initReader();
return storeFile;
}
@@ -726,8 +724,8 @@ public class HStore implements Store {
try {
LOG.info("Validating hfile at " + srcPath + " for inclusion in "
+ "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
- reader = HFile.createReader(srcPath.getFileSystem(conf),
- srcPath, cacheConf, conf);
+ reader = HFile.createReader(srcPath.getFileSystem(conf), srcPath, cacheConf,
+ isPrimaryReplicaStore(), conf);
reader.loadFileInfo();
byte[] firstKey = reader.getFirstRowKey();
@@ -1180,7 +1178,7 @@ public class HStore implements Store {
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
- cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+ cacheBlocks, usePread, isCompaction, false, matcher, readPt);
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
scanners.addAll(sfScanners);
// Then the memstore scanners
@@ -1203,7 +1201,7 @@ public class HStore implements Store {
}
}
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
- cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+ cacheBlocks, usePread, isCompaction, false, matcher, readPt);
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
scanners.addAll(sfScanners);
// Then the memstore scanners
@@ -2456,8 +2454,9 @@ public class HStore implements Store {
LOG.debug("The file " + file + " was closed but still not archived.");
}
filesToRemove.add(file);
+ continue;
}
- if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+ if (file.isCompactedAway() && !file.isReferencedInReads()) {
// Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway
if (LOG.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index 41c13f5..d71af2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -54,7 +54,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
/** Constructor for testing. */
ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
- final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners)
+ final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners)
throws IOException {
super(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP);
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 7aef05e..c53fbf08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* and append data. Be sure to add any metadata before calling close on the
* Writer (Use the appendMetadata convenience methods). On close, a StoreFile
* is sitting in the Filesystem. To refer to it, create a StoreFile instance
- * passing filesystem and path. To read, call {@link #createReader()}.
+ * passing filesystem and path. To read, call {@link #initReader()}
* <p>StoreFiles may also reference store files in another Store.
*
* The reason for this weird pattern where you use a different instance for the
@@ -64,6 +65,10 @@ import org.apache.hadoop.hbase.util.Bytes;
public class StoreFile {
private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
+ public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
+
+ private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
+
// Keys for fileinfo values in HFile
/** Max Sequence ID in FileInfo */
@@ -103,6 +108,18 @@ public class StoreFile {
// Block cache configuration and reference.
private final CacheConfig cacheConf;
+ // Counter that is incremented every time a scanner is created on the
+ // store file. It is decremented when the scan on the store file is
+ // done.
+ private final AtomicInteger refCount = new AtomicInteger(0);
+
+ private final boolean noReadahead;
+
+ private final boolean primaryReplica;
+
+ // Indicates if the file got compacted
+ private volatile boolean compactedAway = false;
+
// Keys for metadata stored in backing HFile.
// Set when we obtain a Reader.
private long sequenceid = -1;
@@ -116,7 +133,7 @@ public class StoreFile {
private Cell lastKey;
- private Comparator comparator;
+ private Comparator<Cell> comparator;
CacheConfig getCacheConf() {
return cacheConf;
@@ -130,7 +147,7 @@ public class StoreFile {
return lastKey;
}
- public Comparator getComparator() {
+ public Comparator<Cell> getComparator() {
return comparator;
}
@@ -179,72 +196,96 @@ public class StoreFile {
public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
/**
- * Constructor, loads a reader and it's indices, etc. May allocate a
- * substantial amount of ram depending on the underlying files (10-20MB?).
- *
- * @param fs The current file system to use.
- * @param p The path of the file.
- * @param conf The current configuration.
- * @param cacheConf The cache configuration and block cache reference.
- * @param cfBloomType The bloom type to use for this store file as specified
- * by column family configuration. This may or may not be the same
- * as the Bloom filter type actually present in the HFile, because
- * column family configuration might change. If this is
+ * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+ * depending on the underlying files (10-20MB?).
+ * @param fs The current file system to use.
+ * @param p The path of the file.
+ * @param conf The current configuration.
+ * @param cacheConf The cache configuration and block cache reference.
+ * @param cfBloomType The bloom type to use for this store file as specified by column family
+ * configuration. This may or may not be the same as the Bloom filter type actually
+ * present in the HFile, because column family configuration might change. If this is
* {@link BloomType#NONE}, the existing Bloom filter is ignored.
- * @throws IOException When opening the reader fails.
+ * @deprecated Now we will specific whether the StoreFile is for primary replica when
+ * constructing, so please use
+ * {@link #StoreFile(FileSystem, Path, Configuration, CacheConfig, BloomType, boolean)}
+ * directly.
*/
+ @Deprecated
public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
- final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
+ final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
}
/**
- * Constructor, loads a reader and it's indices, etc. May allocate a
- * substantial amount of ram depending on the underlying files (10-20MB?).
- *
- * @param fs The current file system to use.
- * @param fileInfo The store file information.
- * @param conf The current configuration.
- * @param cacheConf The cache configuration and block cache reference.
- * @param cfBloomType The bloom type to use for this store file as specified
- * by column family configuration. This may or may not be the same
- * as the Bloom filter type actually present in the HFile, because
- * column family configuration might change. If this is
+ * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+ * depending on the underlying files (10-20MB?).
+ * @param fs The current file system to use.
+ * @param p The path of the file.
+ * @param conf The current configuration.
+ * @param cacheConf The cache configuration and block cache reference.
+ * @param cfBloomType The bloom type to use for this store file as specified by column family
+ * configuration. This may or may not be the same as the Bloom filter type actually
+ * present in the HFile, because column family configuration might change. If this is
+ * {@link BloomType#NONE}, the existing Bloom filter is ignored.
+ * @param primaryReplica true if this is a store file for primary replica, otherwise false.
+ * @throws IOException
+ */
+ public StoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
+ BloomType cfBloomType, boolean primaryReplica) throws IOException {
+ this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica);
+ }
+
+ /**
+ * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+ * depending on the underlying files (10-20MB?).
+ * @param fs The current file system to use.
+ * @param fileInfo The store file information.
+ * @param conf The current configuration.
+ * @param cacheConf The cache configuration and block cache reference.
+ * @param cfBloomType The bloom type to use for this store file as specified by column family
+ * configuration. This may or may not be the same as the Bloom filter type actually
+ * present in the HFile, because column family configuration might change. If this is
* {@link BloomType#NONE}, the existing Bloom filter is ignored.
- * @throws IOException When opening the reader fails.
+ * @deprecated Now we will specific whether the StoreFile is for primary replica when
+ * constructing, so please use
+ * {@link #StoreFile(FileSystem, StoreFileInfo, Configuration, CacheConfig, BloomType, boolean)}
+ * directly.
*/
+ @Deprecated
public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
- final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
+ final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
+ this(fs, fileInfo, conf, cacheConf, cfBloomType, true);
+ }
+
+ /**
+ * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+ * depending on the underlying files (10-20MB?).
+ * @param fs fs The current file system to use.
+ * @param fileInfo The store file information.
+ * @param conf The current configuration.
+ * @param cacheConf The cache configuration and block cache reference.
+ * @param cfBloomType cfBloomType The bloom type to use for this store file as specified by column
+ * family configuration. This may or may not be the same as the Bloom filter type
+ * actually present in the HFile, because column family configuration might change. If
+ * this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
+ * @param primaryReplica true if this is a store file for primary replica, otherwise false.
+ */
+ public StoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
+ BloomType cfBloomType, boolean primaryReplica) {
this.fs = fs;
this.fileInfo = fileInfo;
this.cacheConf = cacheConf;
-
+ this.noReadahead =
+ conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
this.cfBloomType = cfBloomType;
} else {
- LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
- "cfBloomType=" + cfBloomType + " (disabled in config)");
+ LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
+ cfBloomType + " (disabled in config)");
this.cfBloomType = BloomType.NONE;
}
- }
-
- /**
- * Clone
- * @param other The StoreFile to clone from
- */
- public StoreFile(final StoreFile other) {
- this.fs = other.fs;
- this.fileInfo = other.fileInfo;
- this.cacheConf = other.cacheConf;
- this.cfBloomType = other.cfBloomType;
- this.metadataMap = other.metadataMap;
- }
-
- /**
- * Clone a StoreFile for opening private reader.
- */
- public StoreFile cloneForReader() {
- return new StoreFile(this);
+ this.primaryReplica = primaryReplica;
}
/**
@@ -266,12 +307,12 @@ public class StoreFile {
* @return Returns the qualified path of this StoreFile
*/
public Path getQualifiedPath() {
- return this.fileInfo.getPath().makeQualified(fs);
+ return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
/**
* @return True if this is a StoreFile Reference; call
- * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
+ * after {@link #open()} else may get wrong answer.
*/
public boolean isReference() {
return this.fileInfo.isReference();
@@ -376,15 +417,21 @@ public class StoreFile {
@VisibleForTesting
public boolean isCompactedAway() {
- if (this.reader != null) {
- return this.reader.isCompactedAway();
- }
- return true;
+ return compactedAway;
}
@VisibleForTesting
public int getRefCount() {
- return this.reader.getRefCount().get();
+ return refCount.get();
+ }
+
+ /**
+ * @return true if the file is still used in reads
+ */
+ public boolean isReferencedInReads() {
+ int rc = refCount.get();
+ assert rc >= 0; // we should not go negative.
+ return rc > 0;
}
/**
@@ -404,18 +451,18 @@ public class StoreFile {
}
/**
- * Opens reader on this store file. Called by Constructor.
- * @return Reader for the store file.
+ * Opens reader on this store file. Called by Constructor.
* @throws IOException
* @see #closeReader(boolean)
*/
- private StoreFileReader open(boolean canUseDropBehind) throws IOException {
+ private void open() throws IOException {
if (this.reader != null) {
throw new IllegalAccessError("Already open");
}
// Open the StoreFile.Reader
- this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
+ this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L,
+ primaryReplica, refCount, true);
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@@ -513,38 +560,45 @@ public class StoreFile {
firstKey = reader.getFirstKey();
lastKey = reader.getLastKey();
comparator = reader.getComparator();
- return this.reader;
- }
-
- public StoreFileReader createReader() throws IOException {
- return createReader(false);
}
/**
- * @return Reader for StoreFile. creates if necessary
- * @throws IOException
+ * Initialize the reader used for pread.
*/
- public StoreFileReader createReader(boolean canUseDropBehind) throws IOException {
- if (this.reader == null) {
+ public void initReader() throws IOException {
+ if (reader == null) {
try {
- this.reader = open(canUseDropBehind);
- } catch (IOException e) {
+ open();
+ } catch (Exception e) {
try {
- boolean evictOnClose =
- cacheConf != null? cacheConf.shouldEvictOnClose(): true;
+ boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
this.closeReader(evictOnClose);
} catch (IOException ee) {
+ LOG.warn("failed to close reader", ee);
}
throw e;
}
-
}
- return this.reader;
+ }
+
+ private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
+ initReader();
+ StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
+ primaryReplica, refCount, false);
+ reader.copyFields(this.reader);
+ return reader;
+ }
+
+ public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
+ boolean pread, boolean isCompaction, long readPt, long scannerOrder,
+ boolean canOptimizeForNonNullColumn) throws IOException {
+ return createStreamReader(canUseDropBehind).getStoreFileScanner(
+ cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
}
/**
- * @return Current reader. Must call createReader first else returns null.
- * @see #createReader()
+ * @return Current reader. Must call initReader first else returns null.
+ * @see #initReader()
*/
public StoreFileReader getReader() {
return this.reader;
@@ -566,9 +620,7 @@ public class StoreFile {
* Marks the status of the file as compactedAway.
*/
public void markCompactedAway() {
- if (this.reader != null) {
- this.reader.markCompactedAway();
- }
+ this.compactedAway = true;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 3c12045..c4754a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -21,17 +21,18 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@@ -233,25 +234,24 @@ public class StoreFileInfo {
* @param cacheConf The cache configuration and block cache reference.
* @return The StoreFile.Reader for the file
*/
- public StoreFileReader open(final FileSystem fs,
- final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
+ public StoreFileReader open(FileSystem fs, CacheConfig cacheConf, boolean canUseDropBehind,
+ long readahead, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared)
+ throws IOException {
FSDataInputStreamWrapper in;
FileStatus status;
final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
if (this.link != null) {
// HFileLink
- in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
+ in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind, readahead);
status = this.link.getFileStatus(fs);
} else if (this.reference != null) {
// HFile Reference
Path referencePath = getReferredToFile(this.getPath());
- in = new FSDataInputStreamWrapper(fs, referencePath,
- doDropBehind);
+ in = new FSDataInputStreamWrapper(fs, referencePath, doDropBehind, readahead);
status = fs.getFileStatus(referencePath);
} else {
- in = new FSDataInputStreamWrapper(fs, this.getPath(),
- doDropBehind);
+ in = new FSDataInputStreamWrapper(fs, this.getPath(), doDropBehind, readahead);
status = fs.getFileStatus(initialPath);
}
long length = status.getLen();
@@ -265,9 +265,10 @@ public class StoreFileInfo {
if (reader == null) {
if (this.reference != null) {
reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
- conf);
+ isPrimaryReplicaStoreFile, refCount, shared, conf);
} else {
- reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, conf);
+ reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf,
+ isPrimaryReplicaStoreFile, refCount, shared, conf);
}
}
if (this.coprocessorHost != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 8f01a93..b015ea5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.DataInput;
import java.io.IOException;
import java.util.Map;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@@ -68,36 +69,47 @@ public class StoreFileReader {
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
private boolean skipResetSeqId = true;
- public AtomicInteger getRefCount() {
- return refCount;
- }
-
// Counter that is incremented every time a scanner is created on the
- // store file. It is decremented when the scan on the store file is
- // done.
- private AtomicInteger refCount = new AtomicInteger(0);
- // Indicates if the file got compacted
- private volatile boolean compactedAway = false;
+ // store file. It is decremented when the scan on the store file is
+ // done. All StoreFileReader for the same StoreFile will share this counter.
+ private final AtomicInteger refCount;
- public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
- throws IOException {
- reader = HFile.createReader(fs, path, cacheConf, conf);
+ // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
+ // close the internal reader when readCompleted is called.
+ private final boolean shared;
+
+ private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
+ this.reader = reader;
bloomFilterType = BloomType.NONE;
+ this.refCount = refCount;
+ this.shared = shared;
}
- void markCompactedAway() {
- this.compactedAway = true;
+ public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf,
+ boolean primaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
+ throws IOException {
+ this(HFile.createReader(fs, path, cacheConf, primaryReplicaStoreFile, conf), refCount, shared);
}
public StoreFileReader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
- CacheConfig cacheConf, Configuration conf) throws IOException {
- reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
- bloomFilterType = BloomType.NONE;
+ CacheConfig cacheConf, boolean primaryReplicaStoreFile, AtomicInteger refCount,
+ boolean shared, Configuration conf) throws IOException {
+ this(HFile.createReader(fs, path, in, size, cacheConf, primaryReplicaStoreFile, conf), refCount,
+ shared);
}
- public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
- reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
+ void copyFields(StoreFileReader reader) {
+ this.generalBloomFilter = reader.generalBloomFilter;
+ this.deleteFamilyBloomFilter = reader.deleteFamilyBloomFilter;
+ this.bloomFilterType = reader.bloomFilterType;
+ this.sequenceID = reader.sequenceID;
+ this.timeRange = reader.timeRange;
+ this.lastBloomKey = reader.lastBloomKey;
+ this.bulkLoadResult = reader.bulkLoadResult;
+ this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV;
+ this.skipResetSeqId = reader.skipResetSeqId;
}
+
public boolean isPrimaryReplicaReader() {
return reader.isPrimaryReplicaReader();
}
@@ -105,8 +117,11 @@ public class StoreFileReader {
/**
* ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
*/
+ @VisibleForTesting
StoreFileReader() {
+ this.refCount = new AtomicInteger(0);
this.reader = null;
+ this.shared = false;
}
public CellComparator getComparator() {
@@ -128,30 +143,23 @@ public class StoreFileReader {
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
// Increment the ref count
refCount.incrementAndGet();
- return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction,
- reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
+ return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction),
+ !isCompaction, reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
}
/**
- * Decrement the ref count associated with the reader when ever a scanner associated
- * with the reader is closed
+ * Indicate that the scanner has finished reading with this reader. We need to decrement the ref
+ * count, and also, if this is not the common pread reader, we should close it.
*/
- void decrementRefCount() {
+ void readCompleted() {
refCount.decrementAndGet();
- }
-
- /**
- * @return true if the file is still used in reads
- */
- public boolean isReferencedInReads() {
- return refCount.get() != 0;
- }
-
- /**
- * @return true if the file is compacted
- */
- public boolean isCompactedAway() {
- return this.compactedAway;
+ if (!shared) {
+ try {
+ reader.close(false);
+ } catch (IOException e) {
+ LOG.warn("failed to close stream reader", e);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index ab6b0ef..aa4f897 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -124,26 +122,44 @@ public class StoreFileScanner implements KeyValueScanner {
*/
public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop,
- ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
+ ScanQueryMatcher matcher, long readPt) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<>(files.size());
- List<StoreFile> sorted_files = new ArrayList<>(files);
- Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
- for (int i = 0; i < sorted_files.size(); i++) {
- StoreFileReader r = sorted_files.get(i).createReader(canUseDrop);
- r.setReplicaStoreFile(isPrimaryReplica);
- StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt,
- i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
+ List<StoreFile> sortedFiles = new ArrayList<>(files);
+ Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+ for (int i = 0, n = sortedFiles.size(); i < n; i++) {
+ StoreFile sf = sortedFiles.get(i);
+ sf.initReader();
+ StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread,
+ isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
scanners.add(scanner);
}
return scanners;
}
- public static List<StoreFileScanner> getScannersForStoreFiles(
- Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
- boolean isCompaction, boolean canUseDrop,
- ScanQueryMatcher matcher, long readPt) throws IOException {
- return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
- matcher, readPt, true);
+ /**
+ * Get scanners for compaction. We will create a separated reader for each store file to avoid
+ * contention with normal read request.
+ */
+ public static List<StoreFileScanner> getScannersForCompaction(Collection<StoreFile> files,
+ boolean canUseDropBehind, long readPt) throws IOException {
+ List<StoreFileScanner> scanners = new ArrayList<>(files.size());
+ List<StoreFile> sortedFiles = new ArrayList<>(files);
+ Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+ boolean succ = false;
+ try {
+ for (int i = 0, n = sortedFiles.size(); i < n; i++) {
+ scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true,
+ readPt, i, false));
+ }
+ succ = true;
+ } finally {
+ if (!succ) {
+ for (StoreFileScanner scanner : scanners) {
+ scanner.close();
+ }
+ }
+ }
+ return scanners;
}
public String toString() {
@@ -262,7 +278,7 @@ public class StoreFileScanner implements KeyValueScanner {
cur = null;
this.hfs.close();
if (this.reader != null) {
- this.reader.decrementRefCount();
+ this.reader.readCompleted();
}
closed = true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 99ec30e..3bc6a0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -312,7 +312,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@VisibleForTesting
StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
- final List<KeyValueScanner> scanners) throws IOException {
+ final List<? extends KeyValueScanner> scanners) throws IOException {
this(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP,
// 0 is passed as readpoint because the test bypasses Store
@@ -322,7 +322,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@VisibleForTesting
StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
- final List<KeyValueScanner> scanners, long earliestPutTs)
+ final List<? extends KeyValueScanner> scanners, long earliestPutTs)
throws IOException {
this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
// 0 is passed as readpoint because the test bypasses Store
@@ -330,7 +330,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
- final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs,
+ final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs,
long readPt) throws IOException {
this(null, scan, scanInfo, columns, readPt,
scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false);