You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/11 03:29:10 UTC
hbase git commit: HBASE-17917 Use pread by default for all user scan
Repository: hbase
Updated Branches:
refs/heads/master c5cc81d8e -> 0ae0edcd6
HBASE-17917 Use pread by default for all user scan
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0ae0edcd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0ae0edcd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0ae0edcd
Branch: refs/heads/master
Commit: 0ae0edcd630aa1dcb6c47ea11fa4367ae0a5baa8
Parents: c5cc81d
Author: zhangduo <zh...@apache.org>
Authored: Wed May 10 14:15:44 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 11 11:28:22 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/io/hfile/HFileBlock.java | 70 ++---
.../hadoop/hbase/regionserver/KeyValueHeap.java | 3 +
.../hbase/regionserver/KeyValueScanner.java | 7 +
.../regionserver/NonLazyKeyValueScanner.java | 9 +
.../regionserver/ReversedStoreScanner.java | 12 +-
.../hadoop/hbase/regionserver/ScanInfo.java | 17 +-
.../hbase/regionserver/SegmentScanner.java | 7 +-
.../hadoop/hbase/regionserver/StoreFile.java | 14 +-
.../hbase/regionserver/StoreFileInfo.java | 15 +
.../hbase/regionserver/StoreFileReader.java | 3 +-
.../hbase/regionserver/StoreFileScanner.java | 42 +--
.../hadoop/hbase/regionserver/StoreScanner.java | 279 ++++++++++++-------
.../org/apache/hadoop/hbase/TestIOFencing.java | 79 +++---
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 3 +-
.../regionserver/DelegatingKeyValueScanner.java | 6 +
.../hbase/regionserver/MockStoreFile.java | 13 +-
.../hbase/regionserver/TestBlocksScanned.java | 6 +-
.../regionserver/TestCompactingMemStore.java | 2 +-
.../hbase/regionserver/TestCompaction.java | 6 +-
.../TestDefaultCompactSelection.java | 4 +-
.../hbase/regionserver/TestDefaultMemStore.java | 7 +-
.../hbase/regionserver/TestMajorCompaction.java | 6 +-
.../regionserver/TestReversibleScanners.java | 5 +-
.../hbase/regionserver/TestStoreScanner.java | 13 +-
.../regionserver/TestSwitchToStreamRead.java | 127 +++++++++
.../TestCompactionScanQueryMatcher.java | 4 +-
.../querymatcher/TestUserScanQueryMatcher.java | 23 +-
.../hbase/util/TestCoprocessorScanPolicy.java | 21 +-
28 files changed, 541 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 445dc86..1e86b0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@@ -24,8 +27,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,9 +54,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.IOUtils;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
/**
* Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches.
* Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since
@@ -1418,7 +1416,7 @@ public class HFileBlock implements Cacheable {
static class FSReaderImpl implements FSReader {
/** The file system stream of the underlying {@link HFile} that
* does or doesn't do checksum validations in the filesystem */
- protected FSDataInputStreamWrapper streamWrapper;
+ private FSDataInputStreamWrapper streamWrapper;
private HFileBlockDecodingContext encodedBlockDecodingCtx;
@@ -1434,22 +1432,18 @@ public class HFileBlock implements Cacheable {
private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader());
/** The size of the file we are reading from, or -1 if unknown. */
- protected long fileSize;
+ private long fileSize;
/** The size of the header */
+ @VisibleForTesting
protected final int hdrSize;
/** The filesystem used to access data */
- protected HFileSystem hfs;
-
- private final Lock streamLock = new ReentrantLock();
+ private HFileSystem hfs;
- /** The default buffer size for our buffered streams */
- public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
-
- protected HFileContext fileContext;
+ private HFileContext fileContext;
// Cache the fileName
- protected String pathName;
+ private String pathName;
FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
HFileContext fileContext) throws IOException {
@@ -1524,39 +1518,33 @@ public class HFileBlock implements Cacheable {
* next header
* @throws IOException
*/
- protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
+ @VisibleForTesting
+ protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
// We are asked to read the next block's header as well, but there is
// not enough room in the array.
- throw new IOException("Attempted to read " + size + " bytes and " +
- hdrSize + " bytes of next header into a " + dest.length +
- "-byte array at offset " + destOffset);
+ throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
+ " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
}
- if (!pread && streamLock.tryLock()) {
+ if (!pread) {
// Seek + read. Better for scanning.
- try {
- HFileUtil.seekOnMultipleSources(istream, fileOffset);
-
- long realOffset = istream.getPos();
- if (realOffset != fileOffset) {
- throw new IOException("Tried to seek to " + fileOffset + " to "
- + "read " + size + " bytes, but pos=" + realOffset
- + " after seek");
- }
+ HFileUtil.seekOnMultipleSources(istream, fileOffset);
+ long realOffset = istream.getPos();
+ if (realOffset != fileOffset) {
+ throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
+ " bytes, but pos=" + realOffset + " after seek");
+ }
- if (!peekIntoNextBlock) {
- IOUtils.readFully(istream, dest, destOffset, size);
- return -1;
- }
+ if (!peekIntoNextBlock) {
+ IOUtils.readFully(istream, dest, destOffset, size);
+ return -1;
+ }
- // Try to read the next block header.
- if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
- return -1;
- }
- } finally {
- streamLock.unlock();
+ // Try to read the next block header.
+ if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
+ return -1;
}
} else {
// Positional read. Better for random reads; or when the streamLock is already locked.
@@ -1565,7 +1553,6 @@ public class HFileBlock implements Cacheable {
return -1;
}
}
-
assert peekIntoNextBlock;
return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
}
@@ -1719,6 +1706,7 @@ public class HFileBlock implements Cacheable {
* If HBase checksum is switched off, then use HDFS checksum.
* @return the HFileBlock or null if there is a HBase checksum mismatch
*/
+ @VisibleForTesting
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
throws IOException {
@@ -1830,7 +1818,7 @@ public class HFileBlock implements Cacheable {
* If the block doesn't uses checksum, returns false.
* @return True if checksum matches, else false.
*/
- protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
+ private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
throws IOException {
// If this is an older version of the block that does not have checksums, then return false
// indicating that checksum verification did not succeed. Actually, this method should never
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 195e8f7..a398ce9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@@ -422,6 +424,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
return 0;
}
+ @VisibleForTesting
KeyValueScanner getCurrentForTesting() {
return current;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index a4cb2f4..7f716d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@@ -134,6 +135,12 @@ public interface KeyValueScanner extends Shipper, Closeable {
*/
boolean isFileScanner();
+ /**
+ * @return the file path if this is a file scanner, otherwise null.
+ * @see #isFileScanner()
+ */
+ Path getFilePath();
+
// Support for "Reversed Scanner"
/**
* Seek the scanner at or before the row of specified Cell, it firstly
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
index c01b0e6..8778f5d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
@@ -65,6 +66,14 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
// Not a file by default.
return false;
}
+
+
+ @Override
+ public Path getFilePath() {
+ // Not a file by default.
+ return null;
+ }
+
@Override
public Cell getNextIndexedKey() {
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index d71af2b..07f98ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -123,15 +123,17 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
- boolean flushed = checkFlushed();
- checkReseek(flushed);
+ if (checkFlushed()) {
+ reopenAfterFlush();
+ }
return this.heap.seekToPreviousRow(key);
}
-
+
@Override
public boolean backwardSeek(Cell key) throws IOException {
- boolean flushed = checkFlushed();
- checkReseek(flushed);
+ if (checkFlushed()) {
+ reopenAfterFlush();
+ }
return this.heap.backwardSeek(key);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 349e166..2a66e55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -48,6 +48,7 @@ public class ScanInfo {
private boolean usePread;
private long cellsPerTimeoutCheck;
private boolean parallelSeekEnabled;
+ private final long preadMaxBytes;
private final Configuration conf;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
@@ -58,14 +59,14 @@ public class ScanInfo {
* @param conf
* @param family {@link HColumnDescriptor} describing the column family
* @param ttl Store's TTL (in ms)
- * @param timeToPurgeDeletes duration in ms after which a delete marker can
- * be purged during a major compaction.
+ * @param timeToPurgeDeletes duration in ms after which a delete marker can be purged during a
+ * major compaction.
* @param comparator The store's comparator
*/
public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl,
final long timeToPurgeDeletes, final CellComparator comparator) {
- this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
- .getKeepDeletedCells(), timeToPurgeDeletes, comparator);
+ this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl,
+ family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator);
}
/**
@@ -74,6 +75,7 @@ public class ScanInfo {
* @param minVersions Store's MIN_VERSIONS setting
* @param maxVersions Store's VERSIONS setting
* @param ttl Store's TTL (in ms)
+ * @param blockSize Store's block size
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param keepDeletedCells Store's keepDeletedCells setting
@@ -81,7 +83,7 @@ public class ScanInfo {
*/
public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
- final long timeToPurgeDeletes, final CellComparator comparator) {
+ final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
@@ -99,6 +101,7 @@ public class ScanInfo {
perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
this.parallelSeekEnabled =
conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
+ this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize);
this.conf = conf;
}
@@ -149,4 +152,8 @@ public class ScanInfo {
public CellComparator getComparator() {
return comparator;
}
+
+ long getPreadMaxBytes() {
+ return preadMaxBytes;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 2727360..08ded88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.SortedSet;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -316,6 +317,11 @@ public class SegmentScanner implements KeyValueScanner {
return false;
}
+ @Override
+ public Path getFilePath() {
+ return null;
+ }
+
/**
* @return the next key in the index (the key to seek to the next block)
* if known, or null otherwise
@@ -396,5 +402,4 @@ public class SegmentScanner implements KeyValueScanner {
}
return (first != null ? first : second);
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index c53fbf08..91ff97a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -589,11 +589,17 @@ public class StoreFile {
return reader;
}
+ public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
+ boolean canOptimizeForNonNullColumn) {
+ return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
+ canOptimizeForNonNullColumn);
+ }
+
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
- boolean pread, boolean isCompaction, long readPt, long scannerOrder,
- boolean canOptimizeForNonNullColumn) throws IOException {
- return createStreamReader(canUseDropBehind).getStoreFileScanner(
- cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
+ boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
+ throws IOException {
+ return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false,
+ isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index c4754a8..0e99c74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -564,4 +564,19 @@ public class StoreFileInfo {
hash = hash * 31 + ((link == null) ? 0 : link.hashCode());
return hash;
}
+
+ /**
+ * Return the active file name that contains the real data.
+ * <p>
+ * For referenced hfile, we will return the name of the reference file as it will be used to
+ * construct the StoreFileReader. And for linked hfile, we will return the name of the file being
+ * linked.
+ */
+ public String getActiveFileName() {
+ if (reference != null || link == null) {
+ return initialPath.getName();
+ } else {
+ return HFileLink.getReferencedHFileName(initialPath.getName());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index b015ea5..ee7d132 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -76,7 +76,8 @@ public class StoreFileReader {
// indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
// close the internal reader when readCompleted is called.
- private final boolean shared;
+ @VisibleForTesting
+ final boolean shared;
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
this.reader = reader;
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index aa4f897..42c2af2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
@@ -95,25 +96,21 @@ public class StoreFileScanner implements KeyValueScanner {
}
/**
- * Return an array of scanners corresponding to the given
- * set of store files.
+ * Return an array of scanners corresponding to the given set of store files.
*/
- public static List<StoreFileScanner> getScannersForStoreFiles(
- Collection<StoreFile> files,
- boolean cacheBlocks,
- boolean usePread, long readPt) throws IOException {
- return getScannersForStoreFiles(files, cacheBlocks,
- usePread, false, false, readPt);
+ public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
+ boolean cacheBlocks, boolean usePread, long readPt) throws IOException {
+ return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt);
}
/**
* Return an array of scanners corresponding to the given set of store files.
*/
- public static List<StoreFileScanner> getScannersForStoreFiles(
- Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
- boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
- return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
- useDropBehind, null, readPt);
+ public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
+ boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind,
+ long readPt) throws IOException {
+ return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null,
+ readPt);
}
/**
@@ -126,11 +123,17 @@ public class StoreFileScanner implements KeyValueScanner {
List<StoreFileScanner> scanners = new ArrayList<>(files.size());
List<StoreFile> sortedFiles = new ArrayList<>(files);
Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+ boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false;
for (int i = 0, n = sortedFiles.size(); i < n; i++) {
StoreFile sf = sortedFiles.get(i);
sf.initReader();
- StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread,
- isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
+ StoreFileScanner scanner;
+ if (usePread) {
+ scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn);
+ } else {
+ scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i,
+ canOptimizeForNonNullColumn);
+ }
scanners.add(scanner);
}
return scanners;
@@ -148,8 +151,8 @@ public class StoreFileScanner implements KeyValueScanner {
boolean succ = false;
try {
for (int i = 0, n = sortedFiles.size(); i < n; i++) {
- scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true,
- readPt, i, false));
+ scanners.add(
+ sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false));
}
succ = true;
} finally {
@@ -444,6 +447,11 @@ public class StoreFileScanner implements KeyValueScanner {
return true;
}
+ @Override
+ public Path getFilePath() {
+ return reader.getHFileReader().getPath();
+ }
+
// Test methods
static final long getSeekCount() {
return seekCount.sum();
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d39a6ee..338a68c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
@@ -53,8 +55,12 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
- * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
- * into List<KeyValue> for a single row.
+ * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
+ * for a single row.
+ * <p>
+ * The implementation is not thread safe. So there will be no race between next and close. The only
+ * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
+ * is a flush.
*/
@InterfaceAudience.Private
public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@@ -62,36 +68,35 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
// In unit tests, the store could be null
protected final Store store;
- protected ScanQueryMatcher matcher;
+ private ScanQueryMatcher matcher;
protected KeyValueHeap heap;
- protected boolean cacheBlocks;
+ private boolean cacheBlocks;
- protected long countPerRow = 0;
- protected int storeLimit = -1;
- protected int storeOffset = 0;
+ private long countPerRow = 0;
+ private int storeLimit = -1;
+ private int storeOffset = 0;
// Used to indicate that the scanner has closed (see HBASE-1107)
- // Doesnt need to be volatile because it's always accessed via synchronized methods
- protected boolean closing = false;
- protected final boolean get;
- protected final boolean explicitColumnQuery;
- protected final boolean useRowColBloom;
+ // Do not need to be volatile because it's always accessed via synchronized methods
+ private boolean closing = false;
+ private final boolean get;
+ private final boolean explicitColumnQuery;
+ private final boolean useRowColBloom;
/**
* A flag that enables StoreFileScanner parallel-seeking
*/
- protected boolean parallelSeekEnabled = false;
- protected ExecutorService executor;
- protected final Scan scan;
- protected final NavigableSet<byte[]> columns;
- protected final long oldestUnexpiredTS;
- protected final long now;
- protected final int minVersions;
- protected final long maxRowSize;
- protected final long cellsPerHeartbeatCheck;
+ private boolean parallelSeekEnabled = false;
+ private ExecutorService executor;
+ private final Scan scan;
+ private final long oldestUnexpiredTS;
+ private final long now;
+ private final int minVersions;
+ private final long maxRowSize;
+ private final long cellsPerHeartbeatCheck;
// Collects all the KVHeap that are eagerly getting closed during the
// course of a scan
- protected List<KeyValueHeap> heapsForDelayedClose = new ArrayList<>();
+ private final List<KeyValueHeap> heapsForDelayedClose = new ArrayList<>();
/**
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
@@ -100,14 +105,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private long kvsScanned = 0;
private Cell prevCell = null;
+ private final long preadMaxBytes;
+ private long bytesRead;
+
/** We don't ever expect to change this, the constant is just for clarity. */
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
"hbase.storescanner.parallel.seek.enable";
/** Used during unit testing to ensure that lazy seek does save seek ops */
- protected static boolean lazySeekEnabledGlobally =
- LAZY_SEEK_ENABLED_BY_DEFAULT;
+ private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
/**
* The number of cells scanned in between timeout checks. Specifying a larger value means that
@@ -122,19 +129,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/
public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
- // if heap == null and lastTop != null, you need to reseek given the key below
- protected Cell lastTop = null;
+ /**
+ * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
+ * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
+ * block size for this store.
+ */
+ public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
+
+ private final Scan.ReadType readType;
// A flag whether use pread for scan
- private final boolean scanUsePread;
+ // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
+ private boolean scanUsePread;
// Indicates whether there was flush during the course of the scan
- protected volatile boolean flushed = false;
+ private volatile boolean flushed = false;
// generally we get one file from a flush
- protected List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
+ private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
// The current list of scanners
- protected List<KeyValueScanner> currentScanners = new ArrayList<>();
+ private final List<KeyValueScanner> currentScanners = new ArrayList<>();
// flush update lock
- private ReentrantLock flushLock = new ReentrantLock();
+ private final ReentrantLock flushLock = new ReentrantLock();
protected final long readPt;
@@ -155,7 +169,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
int numCol = columns == null ? 0 : columns.size();
explicitColumnQuery = numCol > 0;
this.scan = scan;
- this.columns = columns;
this.now = EnvironmentEdgeManager.currentTime();
this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
this.minVersions = scanInfo.getMinVersions();
@@ -168,32 +181,31 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.maxRowSize = scanInfo.getTableMaxRowSize();
if (get) {
+ this.readType = Scan.ReadType.PREAD;
this.scanUsePread = true;
} else {
- switch (scan.getReadType()) {
- case STREAM:
- this.scanUsePread = false;
- break;
- case PREAD:
- this.scanUsePread = true;
- break;
- default:
- this.scanUsePread = scanInfo.isUsePread();
- break;
+ if (scan.getReadType() == Scan.ReadType.DEFAULT) {
+ this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT;
+ } else {
+ this.readType = scan.getReadType();
+ }
+ // Always start with pread unless user specific stream. Will change to stream later if
+ // readType is default if the scan keeps running for a long time.
+ this.scanUsePread = this.readType != Scan.ReadType.STREAM;
+ }
+ this.preadMaxBytes = scanInfo.getPreadMaxBytes();
+ this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
+ // Parallel seeking is on if the config allows and more there is more than one store file.
+ if (this.store != null && this.store.getStorefilesCount() > 1) {
+ RegionServerServices rsService = ((HStore) store).getHRegion().getRegionServerServices();
+ if (rsService != null && scanInfo.isParallelSeekEnabled()) {
+ this.parallelSeekEnabled = true;
+ this.executor = rsService.getExecutorService();
}
}
- this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
- // Parallel seeking is on if the config allows and more there is more than one store file.
- if (this.store != null && this.store.getStorefilesCount() > 1) {
- RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
- if (rsService != null && scanInfo.isParallelSeekEnabled()) {
- this.parallelSeekEnabled = true;
- this.executor = rsService.getExecutorService();
- }
- }
}
- protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
+ private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
this.currentScanners.addAll(scanners);
}
@@ -360,9 +372,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* Get a filtered list of scanners. Assumes we are not in a compaction.
* @return list of scanners to seek
*/
- protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
- return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher,
- scan.getStartRow(), scan.getStopRow(), this.readPt));
+ private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
+ return selectScannersFrom(
+ store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
+ scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
}
/**
@@ -413,7 +426,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
/**
* Filters the given list of scanners using Bloom filter, time range, and
* TTL.
+ * <p>
+ * Will be overridden by testcase so declared as protected.
*/
+ @VisibleForTesting
protected List<KeyValueScanner> selectScannersFrom(
final List<? extends KeyValueScanner> allScanners) {
boolean memOnly;
@@ -451,10 +467,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public Cell peek() {
- if (this.heap == null) {
- return this.lastTop;
- }
- return this.heap.peek();
+ return heap != null ? heap.peek() : null;
}
@Override
@@ -472,9 +485,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (this.closing) {
return;
}
- if (withHeapClose) this.closing = true;
+ if (withHeapClose) {
+ this.closing = true;
+ }
// Under test, we dont have a this.store
- if (this.store != null) this.store.deleteChangedReaderObserver(this);
+ if (this.store != null) {
+ this.store.deleteChangedReaderObserver(this);
+ }
if (withHeapClose) {
for (KeyValueHeap h : this.heapsForDelayedClose) {
h.close();
@@ -492,14 +509,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heap = null;
}
}
- this.lastTop = null; // If both are null, we are closed.
}
@Override
public boolean seek(Cell key) throws IOException {
- boolean flushed = checkFlushed();
- // reset matcher state, in case that underlying store changed
- checkReseek(flushed);
+ if (checkFlushed()) {
+ reopenAfterFlush();
+ }
return this.heap.seek(key);
}
@@ -519,8 +535,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
- boolean flushed = checkFlushed();
- if (checkReseek(flushed)) {
+ trySwitchToStreamRead();
+ if (checkFlushed() && reopenAfterFlush()) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
@@ -550,7 +566,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Clear progress away unless invoker has indicated it should be kept.
- if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
+ if (!scannerContext.getKeepProgress()) {
+ scannerContext.clearProgress();
+ }
// Only do a sanity-check if store and comparator are available.
CellComparator comparator = store != null ? store.getComparator() : null;
@@ -566,9 +584,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
}
-
- if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
+ // Do object compare - we set prevKV from the same heap.
+ if (prevCell != cell) {
+ ++kvsScanned;
+ }
checkScanOrder(prevCell, cell, comparator);
+ int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
+ bytesRead += cellSize;
prevCell = cell;
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
switch (qcode) {
@@ -600,7 +622,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Update local tracking information
count++;
- int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
totalBytesRead += cellSize;
// Update the progress of the scanner context
@@ -636,7 +657,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
case DONE:
// Optimization for Gets! If DONE, no more to get on this row, early exit!
- if (this.scan.isGetScan()) {
+ if (get) {
// Then no more to this row... exit.
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
@@ -807,34 +828,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
/**
- * @param flushed indicates if there was a flush
- * @return true if top of heap has changed (and KeyValueHeap has to try the
- * next KV)
- * @throws IOException
+ * @return if top of heap has changed (and KeyValueHeap has to try the next KV)
*/
- protected boolean checkReseek(boolean flushed) throws IOException {
- if (flushed && this.lastTop != null) {
- resetScannerStack(this.lastTop);
- if (this.heap.peek() == null
- || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
- LOG.debug("Storescanner.peek() is changed where before = "
- + this.lastTop.toString() + ",and after = " + this.heap.peek());
- this.lastTop = null;
- return true;
- }
- this.lastTop = null; // gone!
- }
- // else dont need to reseek
- return false;
- }
-
- protected void resetScannerStack(Cell lastTopKey) throws IOException {
+ protected final boolean reopenAfterFlush() throws IOException {
+ Cell lastTop = heap.peek();
// When we have the scan object, should we not pass it to getScanners() to get a limited set of
// scanners? We did so in the constructor and we could have done it now by storing the scan
// object from the constructor
- List<KeyValueScanner> scanners = null;
+ List<KeyValueScanner> scanners;
+ flushLock.lock();
try {
- flushLock.lock();
scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get,
scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
// Clear the current set of flushed store files so that they don't get added again
@@ -844,7 +847,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Seek the new scanners to the last key
- seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
+ seekScanners(scanners, lastTop, false, parallelSeekEnabled);
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
if (!currentScanners.get(i).isFileScanner()) {
@@ -856,6 +859,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(this.currentScanners, store.getComparator());
+ resetQueryMatcher(lastTop);
+ if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storescanner.peek() is changed where before = " + lastTop.toString() +
+ ",and after = " + heap.peek());
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void resetQueryMatcher(Cell lastTopKey) {
// Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the
// query matcher if scanning intra-row.
@@ -902,18 +917,73 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean reseek(Cell kv) throws IOException {
- boolean flushed = checkFlushed();
- // Heap will not be null, if this is called from next() which.
- // If called from RegionScanner.reseek(...) make sure the scanner
- // stack is reset if needed.
- checkReseek(flushed);
+ if (checkFlushed()) {
+ reopenAfterFlush();
+ }
if (explicitColumnQuery && lazySeekEnabledGlobally) {
return heap.requestSeek(kv, true, useRowColBloom);
}
return heap.reseek(kv);
}
- protected boolean checkFlushed() {
+ private void trySwitchToStreamRead() throws IOException {
+ if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||
+ bytesRead < preadMaxBytes) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Switch to stream read because we have already read " + bytesRead +
+ " bytes from this scanner");
+ }
+ scanUsePread = false;
+ Cell lastTop = heap.peek();
+ Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount());
+ for (StoreFile file : store.getStorefiles()) {
+ name2File.put(file.getFileInfo().getActiveFileName(), file);
+ }
+ List<StoreFile> filesToReopen = new ArrayList<>();
+ List<KeyValueScanner> memstoreScanners = new ArrayList<>();
+ List<KeyValueScanner> fileScanners = null;
+ List<KeyValueScanner> scannersToClose = new ArrayList<>();
+ boolean succ = false;
+ try {
+ for (KeyValueScanner kvs : currentScanners) {
+ if (!kvs.isFileScanner()) {
+ memstoreScanners.add(kvs);
+ } else {
+ scannersToClose.add(kvs);
+ if (kvs.peek() == null) {
+ continue;
+ }
+ filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
+ }
+ }
+ if (filesToReopen.isEmpty()) {
+ return;
+ }
+ fileScanners =
+ store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
+ scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
+ seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
+ currentScanners.clear();
+ addCurrentScanners(fileScanners);
+ addCurrentScanners(memstoreScanners);
+ resetKVHeap(currentScanners, store.getComparator());
+ resetQueryMatcher(lastTop);
+ for (KeyValueScanner kvs : scannersToClose) {
+ kvs.close();
+ }
+ succ = true;
+ } finally {
+ if (!succ && fileScanners != null) {
+ for (KeyValueScanner scanner : fileScanners) {
+ scanner.close();
+ }
+ }
+ }
+ }
+
+ protected final boolean checkFlushed() {
// check the var without any lock. Suppose even if we see the old
// value here still it is ok to continue because we will not be resetting
// the heap but will continue with the referenced memstore's snapshot. For compactions
@@ -922,9 +992,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (flushed) {
// If there is a flush and the current scan is notified on the flush ensure that the
// scan's heap gets reset and we do a seek on the newly flushed file.
- if(!this.closing) {
- this.lastTop = this.peek();
- } else {
+ if (this.closing) {
return false;
}
// reset the flag
@@ -983,6 +1051,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* Used in testing.
* @return all scanners in no particular order
*/
+ @VisibleForTesting
List<KeyValueScanner> getAllScannersForTesting() {
List<KeyValueScanner> allScanners = new ArrayList<>();
KeyValueScanner current = heap.getCurrentForTesting();
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index f12a334..f65459f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -31,8 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -45,7 +47,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -53,28 +57,26 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
/**
- * Test for the case where a regionserver going down has enough cycles to do damage to regions
- * that have actually been assigned elsehwere.
- *
- * <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
- * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
- * change the region file set. The region in its new location will then get a surprise when it tries to do something
- * w/ a file removed by the region in its old location on dying server.
- *
- * <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
- * if the file was opened before the delete, it will continue to let reads happen until something changes the
- * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
- * from the datanode by NN).
- *
- * <p>What we will do below is do an explicit check for existence on the files listed in the region that
- * has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance
- * occurance.
+ * Test for the case where a regionserver going down has enough cycles to do damage to regions that
+ * have actually been assigned elsehwere.
+ * <p>
+ * If we happen to assign a region before it fully done with in its old location -- i.e. it is on
+ * two servers at the same time -- all can work fine until the case where the region on the dying
+ * server decides to compact or otherwise change the region file set. The region in its new location
+ * will then get a surprise when it tries to do something w/ a file removed by the region in its old
+ * location on dying server.
+ * <p>
+ * Making a test for this case is a little tough in that even if a file is deleted up on the
+ * namenode, if the file was opened before the delete, it will continue to let reads happen until
+ * something changes the state of cached blocks in the dfsclient that was already open (a block from
+ * the deleted file is cleaned from the datanode by NN).
+ * <p>
+ * What we will do below is do an explicit check for existence on the files listed in the region
+ * that has had some files removed because of a compaction. This sort of hurry's along and makes
+ * certain what is a chance occurance.
*/
-@Category({MiscTests.class, MediumTests.class})
+@Category({MiscTests.class, LargeTests.class})
public class TestIOFencing {
private static final Log LOG = LogFactory.getLog(TestIOFencing.class);
static {
@@ -334,23 +336,38 @@ public class TestIOFencing {
while (compactingRegion.compactCount == 0) {
Thread.sleep(1000);
}
- // The server we killed stays up until the compaction that was started before it was killed completes. In logs
- // you should see the old regionserver now going down.
+ // The server we killed stays up until the compaction that was started before it was killed
+ // completes. In logs you should see the old regionserver now going down.
LOG.info("Compaction finished");
// If we survive the split keep going...
// Now we make sure that the region isn't totally confused. Load up more rows.
- TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
+ TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT,
+ FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
admin.majorCompact(TABLE_NAME);
startWaitTime = System.currentTimeMillis();
while (newRegion.compactCount == 0) {
Thread.sleep(1000);
- assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
+ assertTrue("New region never compacted",
+ System.currentTimeMillis() - startWaitTime < 180000);
+ }
+ int count;
+ for (int i = 0;; i++) {
+ try {
+ count = TEST_UTIL.countRows(table);
+ break;
+ } catch (DoNotRetryIOException e) {
+ // wait up to 30s
+ if (i >= 30 || !e.getMessage().contains("File does not exist")) {
+ throw e;
+ }
+ Thread.sleep(1000);
+ }
}
- if(policy == MemoryCompactionPolicy.EAGER) {
- assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= TEST_UTIL.countRows(table));
+ if (policy == MemoryCompactionPolicy.EAGER) {
+ assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
} else {
- assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
+ assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
}
} finally {
if (compactingRegion != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 68c4587..cb1c932 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -671,7 +671,8 @@ public class TestHFileBlock {
while (System.currentTimeMillis() < endTime) {
int blockId = rand.nextInt(NUM_TEST_BLOCKS);
long offset = offsets.get(blockId);
- boolean pread = rand.nextBoolean();
+ // now we only support concurrent read with pread = true
+ boolean pread = true;
boolean withOnDiskSize = rand.nextBoolean();
long expectedSize =
(blockId == NUM_TEST_BLOCKS - 1 ? fileSize
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
index 51a2a97..403d880 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -93,6 +94,11 @@ public class DelegatingKeyValueScanner implements KeyValueScanner {
}
@Override
+ public Path getFilePath() {
+ return delegate.getFilePath();
+ }
+
+ @Override
public boolean backwardSeek(Cell key) throws IOException {
return delegate.backwardSeek(key);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
index d52c6c7..91b85d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
@@ -145,10 +145,17 @@ public class MockStoreFile extends StoreFile {
}
@Override
+ public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
+ boolean canOptimizeForNonNullColumn) {
+ return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
+ canOptimizeForNonNullColumn);
+ }
+
+ @Override
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
- boolean pread, boolean isCompaction, long readPt, long scannerOrder,
- boolean canOptimizeForNonNullColumn) throws IOException {
- return getReader().getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, scannerOrder,
+ boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
+ throws IOException {
+ return getReader().getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
index 497fd03..c28e48b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
@@ -96,14 +96,14 @@ public class TestBlocksScanned extends HBaseTestCase {
CacheStats stats = new CacheConfig(TEST_UTIL.getConfiguration()).getBlockCache().getStats();
long before = stats.getHitCount() + stats.getMissCount();
// Do simple test of getting one row only first.
- Scan scan = new Scan(Bytes.toBytes("aaa"), Bytes.toBytes("aaz"));
+ Scan scan = new Scan().withStartRow(Bytes.toBytes("aaa")).withStopRow(Bytes.toBytes("aaz"))
+ .setReadType(Scan.ReadType.PREAD);
scan.addColumn(FAMILY, COL);
scan.setMaxVersions(1);
InternalScanner s = r.getScanner(scan);
List<Cell> results = new ArrayList<>();
- while (s.next(results))
- ;
+ while (s.next(results));
s.close();
int expectResultSize = 'z' - 'a';
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 9e90f3e..04435db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -192,7 +192,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
Configuration conf = HBaseConfiguration.create();
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
- KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
+ KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN;
InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 1bf6ea7..5f4c0aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -212,9 +212,9 @@ public class TestCompaction {
for (Store hstore: this.r.stores.values()) {
HStore store = (HStore)hstore;
ScanInfo old = store.getScanInfo();
- ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
- old.getMinVersions(), old.getMaxVersions(), ttl,
- old.getKeepDeletedCells(), 0, old.getComparator());
+ ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
+ old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0,
+ old.getComparator());
store.setScanInfo(si);
}
Thread.sleep(ttl);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
index 3c41fc5..584285b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
@@ -160,8 +160,8 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
ScanInfo oldScanInfo = store.getScanInfo();
ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
- oldScanInfo.getKeepDeletedCells(), oldScanInfo.getTimeToPurgeDeletes(),
- oldScanInfo.getComparator());
+ oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(),
+ oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator());
store.setScanInfo(newScanInfo);
// Do not compact empty store file
List<StoreFile> candidates = sfCreate(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 3acb48b..3b15ff3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -162,9 +162,8 @@ public class TestDefaultMemStore {
Scan scan = new Scan();
List<Cell> result = new ArrayList<>();
Configuration conf = HBaseConfiguration.create();
- ScanInfo scanInfo =
- new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
- this.memstore.getComparator());
+ ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP,
+ KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN;
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
int count = 0;
@@ -602,7 +601,7 @@ public class TestDefaultMemStore {
Configuration conf = HBaseConfiguration.create();
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
- KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
+ KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN;
try (InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index 9d00d38..0b35f95 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -292,9 +292,9 @@ public class TestMajorCompaction {
for (Store hstore : r.getStores()) {
HStore store = ((HStore) hstore);
ScanInfo old = store.getScanInfo();
- ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
- old.getMinVersions(), old.getMaxVersions(), ttl,
- old.getKeepDeletedCells(), 0, old.getComparator());
+ ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
+ old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0,
+ old.getComparator());
store.setScanInfo(si);
}
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index c1fd6a3..2dfdf5b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -264,8 +264,9 @@ public class TestReversibleScanners {
BloomType.NONE, true);
ScanType scanType = ScanType.USER_SCAN;
- ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
- Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
+ ScanInfo scanInfo =
+ new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
+ KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
// Case 1.Test a full reversed scan
Scan scan = new Scan();
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index 3e2949c..524af34 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -67,8 +67,8 @@ public class TestStoreScanner {
private static final String CF_STR = "cf";
private static final byte [] CF = Bytes.toBytes(CF_STR);
static Configuration CONF = HBaseConfiguration.create();
- private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE,
- Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
+ private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
+ KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
private ScanType scanType = ScanType.USER_SCAN;
/**
@@ -829,8 +829,8 @@ public class TestStoreScanner {
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scan = new Scan();
scan.setMaxVersions(1);
- ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
- CellComparator.COMPARATOR);
+ ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
+ HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
ScanType scanType = ScanType.USER_SCAN;
try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) {
List<Cell> results = new ArrayList<>();
@@ -902,8 +902,8 @@ public class TestStoreScanner {
Scan scan = new Scan();
scan.setMaxVersions(1);
// scanner with ttl equal to 500
- ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
- CellComparator.COMPARATOR);
+ ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
+ HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
ScanType scanType = ScanType.USER_SCAN;
try (StoreScanner scanner =
new StoreScanner(scan, scanInfo, scanType, null, scanners)) {
@@ -968,6 +968,7 @@ public class TestStoreScanner {
0 /* minVersions */,
2 /* maxVersions */, 500 /* ttl */,
KeepDeletedCells.FALSE /* keepDeletedCells */,
+ HConstants.DEFAULT_BLOCKSIZE /* block size */,
200, /* timeToPurgeDeletes */
CellComparator.COMPARATOR);
try (StoreScanner scanner =
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
new file mode 100644
index 0000000..fb978b1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestSwitchToStreamRead {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("stream");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] QUAL = Bytes.toBytes("cq");
+
+ private static String VALUE_PREFIX;
+
+ private static HRegion REGION;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
+ StringBuilder sb = new StringBuilder(256);
+ for (int i = 0; i < 255; i++) {
+ sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
+ }
+ VALUE_PREFIX = sb.append("-").toString();
+ REGION = UTIL.createLocalHRegion(
+ new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(FAMILY).setBlocksize(1024)),
+ null, null);
+ for (int i = 0; i < 900; i++) {
+ REGION
+ .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
+ }
+ REGION.flush(true);
+ for (int i = 900; i < 1000; i++) {
+ REGION
+ .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ REGION.close(true);
+ UTIL.cleanupTestDir();
+ }
+
+ @Test
+ public void test() throws IOException {
+ try (RegionScanner scanner = REGION.getScanner(new Scan())) {
+ StoreScanner storeScanner = (StoreScanner) ((RegionScannerImpl) scanner)
+ .getStoreHeapForTesting().getCurrentForTesting();
+ for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+ if (kvs instanceof StoreFileScanner) {
+ StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+ // starting from pread so we use shared reader here.
+ assertTrue(sfScanner.getReader().shared);
+ }
+ }
+ List<Cell> cells = new ArrayList<>();
+ for (int i = 0; i < 500; i++) {
+ assertTrue(scanner.next(cells));
+ Result result = Result.create(cells);
+ assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
+ cells.clear();
+ }
+ for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+ if (kvs instanceof StoreFileScanner) {
+ StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+ // we should have convert to use stream read now.
+ assertFalse(sfScanner.getReader().shared);
+ }
+ }
+ for (int i = 500; i < 1000; i++) {
+ assertEquals(i != 999, scanner.next(cells));
+ Result result = Result.create(cells);
+ assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
+ cells.clear();
+ }
+ }
+ // make sure all scanners are closed.
+ for (StoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
+ assertFalse(sf.isReferencedInReads());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
index af8c27d..73c92e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
@@ -73,8 +73,8 @@ public class TestCompactionScanQueryMatcher extends AbstractTestScanQueryMatcher
throws IOException {
long now = EnvironmentEdgeManager.currentTime();
// Set time to purge deletes to negative value to avoid it ever happening.
- ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L,
- rowComparator);
+ ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
+ HConstants.DEFAULT_BLOCKSIZE, -1L, rowComparator);
CompactionScanQueryMatcher qm = CompactionScanQueryMatcher.create(scanInfo,
ScanType.COMPACT_RETAIN_DELETES, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP,
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
index b4e4311..f3cf604 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
@@ -54,7 +54,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
long now = EnvironmentEdgeManager.currentTime();
// Do with fam2 which has a col2 qualifier.
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
- new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
+ new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE,
+ HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
get.getFamilyMap().get(fam2), now - ttl, now, null);
Cell kv = new KeyValue(row1, fam2, col2, 1, data);
Cell cell = CellUtil.createLastOnRowCol(kv);
@@ -79,8 +80,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
long now = EnvironmentEdgeManager.currentTime();
// 2,4,5
- UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
- new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
+ UserScanQueryMatcher qm = UserScanQueryMatcher.create(
+ scan, new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
+ HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
get.getFamilyMap().get(fam2), now - ttl, now, null);
List<KeyValue> memstore = new ArrayList<>(6);
@@ -122,9 +124,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
expected.add(ScanQueryMatcher.MatchCode.DONE);
long now = EnvironmentEdgeManager.currentTime();
- UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
- new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
- now - ttl, now, null);
+ UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1,
+ ttl, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
+ null, now - ttl, now, null);
List<KeyValue> memstore = new ArrayList<>(6);
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -168,7 +170,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
long now = EnvironmentEdgeManager.currentTime();
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
- new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator),
+ new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE,
+ HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
get.getFamilyMap().get(fam2), now - testTTL, now, null);
KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
@@ -209,9 +212,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
ScanQueryMatcher.MatchCode.DONE };
long now = EnvironmentEdgeManager.currentTime();
- UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
- new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
- now - testTTL, now, null);
+ UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1,
+ testTTL, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
+ null, now - testTTL, now, null);
KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
new KeyValue(row1, fam2, col2, now - 50, data),
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
index 27e93a0..720ad29 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
@@ -245,11 +245,10 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
- ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
- family.getName(), family.getMinVersions(),
- newVersions == null ? family.getMaxVersions() : newVersions,
+ ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
+ family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
- oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners,
@@ -266,11 +265,10 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
- ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
- family.getName(), family.getMinVersions(),
- newVersions == null ? family.getMaxVersions() : newVersions,
+ ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
+ family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
- oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners, scanType,
@@ -287,11 +285,10 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
- ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
- family.getName(), family.getMinVersions(),
- newVersions == null ? family.getMaxVersions() : newVersions,
+ ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
+ family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
- oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(store, scanInfo, scan, targetCols, readPt);
} else {
return s;