You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/05/25 00:51:23 UTC
svn commit: r1486246 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-common/src/main/java/org/apache/hadoop/hbase/util/
hbase-protocol/src/main/java/org/ap...
Author: jxiang
Date: Fri May 24 22:51:22 2013
New Revision: 1486246
URL: http://svn.apache.org/r1486246
Log:
HBASE-8420 Port HBASE-6874 Implement prefetching for scanners from 0.89-fb
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java (with props)
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Fri May 24 22:51:22 2013
@@ -128,7 +128,7 @@ public class ClientScanner extends Abstr
}
// initialize the scanner
- nextScanner(this.caching, false);
+ nextScanner(false);
}
protected HConnection getConnection() {
@@ -169,10 +169,9 @@ public class ClientScanner extends Abstr
* scanner at the scan.getStartRow(). We will go no further, just tidy
* up outstanding scanners, if <code>currentRegion != null</code> and
* <code>done</code> is true.
- * @param nbRows
* @param done Server-side says we're done scanning.
*/
- private boolean nextScanner(int nbRows, final boolean done)
+ private boolean nextScanner(final boolean done)
throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
@@ -210,7 +209,7 @@ public class ClientScanner extends Abstr
Bytes.toStringBinary(localStartKey) + "'");
}
try {
- callable = getScannerCallable(localStartKey, nbRows);
+ callable = getScannerCallable(localStartKey);
// Open a scanner on the region server starting at the
// beginning of the region
callable.withRetries();
@@ -225,12 +224,11 @@ public class ClientScanner extends Abstr
return true;
}
- protected ScannerCallable getScannerCallable(byte [] localStartKey,
- int nbRows) {
+ protected ScannerCallable getScannerCallable(byte [] localStartKey) {
scan.setStartRow(localStartKey);
ScannerCallable s = new ScannerCallable(getConnection(),
getTableName(), scan, this.scanMetrics);
- s.setCaching(nbRows);
+ s.setCaching(this.caching);
return s;
}
@@ -262,27 +260,21 @@ public class ClientScanner extends Abstr
Result [] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
- // We need to reset it if it's a new callable that was created
- // with a countdown in nextScanner
- callable.setCaching(this.caching);
+
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean skipFirst = false;
boolean retryAfterOutOfOrderException = true;
do {
try {
- if (skipFirst) {
- // Skip only the first row (which was the last row of the last
- // already-processed batch).
- callable.setCaching(1);
- values = callable.withRetries();
- callable.setCaching(this.caching);
- skipFirst = false;
- }
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = callable.withRetries();
+ if (skipFirst && values != null && values.length == 1) {
+ skipFirst = false; // Already skipped, unset it before scanning again
+ values = callable.withRetries();
+ }
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
@@ -344,7 +336,15 @@ public class ClientScanner extends Abstr
}
lastNext = currentTime;
if (values != null && values.length > 0) {
- for (Result rs : values) {
+ int i = 0;
+ if (skipFirst) {
+ skipFirst = false;
+ // We will cache one row less, which is fine
+ countdown--;
+ i = 1;
+ }
+ for (; i < values.length; i++) {
+ Result rs = values[i];
cache.add(rs);
for (KeyValue kv : rs.raw()) {
remainingResultSize -= kv.heapSize();
@@ -354,7 +354,7 @@ public class ClientScanner extends Abstr
}
}
// Values == null means server-side filter has determined we must STOP
- } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
+ } while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null));
}
if (cache.size() > 0) {
@@ -411,4 +411,12 @@ public class ClientScanner extends Abstr
}
closed = true;
}
+
+ long currentScannerId() {
+ return (callable == null) ? -1L : callable.scannerId;
+ }
+
+ HRegionInfo currentRegionInfo() {
+ return currentRegion;
+ }
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java Fri May 24 22:51:22 2013
@@ -116,6 +116,8 @@ public class Scan extends OperationWithA
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Boolean loadColumnFamiliesOnDemand = null;
+ private boolean prefetching = true;
+
/**
* Create a Scan operation across all rows.
*/
@@ -168,6 +170,7 @@ public class Scan extends OperationWithA
getScan = scan.isGetScan();
filter = scan.getFilter(); // clone?
loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
+ prefetching = scan.getPrefetching();
TimeRange ctr = scan.getTimeRange();
tr = new TimeRange(ctr.getMin(), ctr.getMax());
Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
@@ -201,6 +204,7 @@ public class Scan extends OperationWithA
this.storeOffset = get.getRowOffsetPerColumnFamily();
this.tr = get.getTimeRange();
this.familyMap = get.getFamilyMap();
+ this.prefetching = false;
this.getScan = true;
}
@@ -364,6 +368,21 @@ public class Scan extends OperationWithA
}
/**
+ * Set if pre-fetching is enabled. If enabled, the region
+ * server will try to read the next scan result ahead of time. This
+ * improves scan performance if we are doing large scans.
+ *
+ * @param enablePrefetching if pre-fetching is enabled or not
+ */
+ public void setPrefetching(boolean enablePrefetching) {
+ this.prefetching = enablePrefetching;
+ }
+
+ public boolean getPrefetching() {
+ return prefetching;
+ }
+
+/**
* @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
*/
public long getMaxResultSize() {
@@ -613,6 +632,7 @@ public class Scan extends OperationWithA
map.put("maxResultSize", this.maxResultSize);
map.put("cacheBlocks", this.cacheBlocks);
map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
+ map.put("prefetching", this.prefetching);
List<Long> timeRange = new ArrayList<Long>();
timeRange.add(this.tr.getMin());
timeRange.add(this.tr.getMax());
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Fri May 24 22:51:22 2013
@@ -60,7 +60,7 @@ public class ScannerCallable extends Ser
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
- private long scannerId = -1L;
+ long scannerId = -1L;
private boolean instantiated = false;
private boolean closed = false;
private Scan scan;
@@ -130,6 +130,7 @@ public class ScannerCallable extends Ser
/**
* @see java.util.concurrent.Callable#call()
*/
+ @SuppressWarnings("deprecation")
public Result [] call() throws IOException {
if (closed) {
if (scannerId != -1) {
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Fri May 24 22:51:22 2013
@@ -709,6 +709,9 @@ public final class ProtobufUtil {
if (scan.getBatch() > 0) {
scanBuilder.setBatchSize(scan.getBatch());
}
+ if (scan.getCaching() > 0) {
+ scanBuilder.setCachingCount(scan.getCaching());
+ }
if (scan.getMaxResultSize() > 0) {
scanBuilder.setMaxResultSize(scan.getMaxResultSize());
}
@@ -716,6 +719,7 @@ public final class ProtobufUtil {
if (loadColumnFamiliesOnDemand != null) {
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
}
+ scanBuilder.setPrefetching(scan.getPrefetching());
scanBuilder.setMaxVersions(scan.getMaxVersions());
TimeRange timeRange = scan.getTimeRange();
if (!timeRange.isAllTime()) {
@@ -793,6 +797,9 @@ public final class ProtobufUtil {
if (proto.hasMaxVersions()) {
scan.setMaxVersions(proto.getMaxVersions());
}
+ if (proto.hasPrefetching()) {
+ scan.setPrefetching(proto.getPrefetching());
+ }
if (proto.hasStoreLimit()) {
scan.setMaxResultsPerColumnFamily(proto.getStoreLimit());
}
@@ -821,6 +828,9 @@ public final class ProtobufUtil {
if (proto.hasBatchSize()) {
scan.setBatch(proto.getBatchSize());
}
+ if (proto.hasCachingCount()) {
+ scan.setCaching(proto.getCachingCount());
+ }
if (proto.hasMaxResultSize()) {
scan.setMaxResultSize(proto.getMaxResultSize());
}
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java Fri May 24 22:51:22 2013
@@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.util;
import java.io.PrintWriter;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -180,8 +183,42 @@ public class Threads {
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
return boundedCachedThreadPool;
}
-
-
+
+ /**
+ * Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be
+ * submitted to it, determined by the blockingLimit parameter. Excess tasks
+ * submitted will block on the calling thread till space frees up.
+ *
+ * @param blockingLimit max number of tasks that can be submitted
+ * @param timeout time value after which unused threads are killed
+ * @param unit time unit for killing unused threads
+ * @param threadFactory thread factory to use to spawn threads
+ * @return the ThreadPoolExecutor
+ */
+ public static ThreadPoolExecutor getBlockingThreadPool(
+ int blockingLimit, long timeout, TimeUnit unit,
+ ThreadFactory threadFactory) {
+ ThreadPoolExecutor blockingThreadPool =
+ new ThreadPoolExecutor(
+ 1, blockingLimit, timeout, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ threadFactory,
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ // The submitting thread will block until the thread pool frees up.
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(
+ "Failed to requeue the rejected request because of ", e);
+ }
+ }
+ });
+ blockingThreadPool.allowCoreThreadTimeOut(true);
+ return blockingThreadPool;
+ }
+
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java Fri May 24 22:51:22 2013
@@ -10622,6 +10622,14 @@ public final class ClientProtos {
// optional bool loadColumnFamiliesOnDemand = 13;
boolean hasLoadColumnFamiliesOnDemand();
boolean getLoadColumnFamiliesOnDemand();
+
+ // optional uint32 cachingCount = 14;
+ boolean hasCachingCount();
+ int getCachingCount();
+
+ // optional bool prefetching = 15;
+ boolean hasPrefetching();
+ boolean getPrefetching();
}
public static final class Scan extends
com.google.protobuf.GeneratedMessage
@@ -10810,6 +10818,26 @@ public final class ClientProtos {
return loadColumnFamiliesOnDemand_;
}
+ // optional uint32 cachingCount = 14;
+ public static final int CACHINGCOUNT_FIELD_NUMBER = 14;
+ private int cachingCount_;
+ public boolean hasCachingCount() {
+ return ((bitField0_ & 0x00000800) == 0x00000800);
+ }
+ public int getCachingCount() {
+ return cachingCount_;
+ }
+
+ // optional bool prefetching = 15;
+ public static final int PREFETCHING_FIELD_NUMBER = 15;
+ private boolean prefetching_;
+ public boolean hasPrefetching() {
+ return ((bitField0_ & 0x00001000) == 0x00001000);
+ }
+ public boolean getPrefetching() {
+ return prefetching_;
+ }
+
private void initFields() {
column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList();
@@ -10824,6 +10852,8 @@ public final class ClientProtos {
storeLimit_ = 0;
storeOffset_ = 0;
loadColumnFamiliesOnDemand_ = false;
+ cachingCount_ = 0;
+ prefetching_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -10894,6 +10924,12 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000400) == 0x00000400)) {
output.writeBool(13, loadColumnFamiliesOnDemand_);
}
+ if (((bitField0_ & 0x00000800) == 0x00000800)) {
+ output.writeUInt32(14, cachingCount_);
+ }
+ if (((bitField0_ & 0x00001000) == 0x00001000)) {
+ output.writeBool(15, prefetching_);
+ }
getUnknownFields().writeTo(output);
}
@@ -10955,6 +10991,14 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(13, loadColumnFamiliesOnDemand_);
}
+ if (((bitField0_ & 0x00000800) == 0x00000800)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(14, cachingCount_);
+ }
+ if (((bitField0_ & 0x00001000) == 0x00001000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(15, prefetching_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -11037,6 +11081,16 @@ public final class ClientProtos {
result = result && (getLoadColumnFamiliesOnDemand()
== other.getLoadColumnFamiliesOnDemand());
}
+ result = result && (hasCachingCount() == other.hasCachingCount());
+ if (hasCachingCount()) {
+ result = result && (getCachingCount()
+ == other.getCachingCount());
+ }
+ result = result && (hasPrefetching() == other.hasPrefetching());
+ if (hasPrefetching()) {
+ result = result && (getPrefetching()
+ == other.getPrefetching());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -11098,6 +11152,14 @@ public final class ClientProtos {
hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
}
+ if (hasCachingCount()) {
+ hash = (37 * hash) + CACHINGCOUNT_FIELD_NUMBER;
+ hash = (53 * hash) + getCachingCount();
+ }
+ if (hasPrefetching()) {
+ hash = (37 * hash) + PREFETCHING_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getPrefetching());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@@ -11260,6 +11322,10 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000800);
loadColumnFamiliesOnDemand_ = false;
bitField0_ = (bitField0_ & ~0x00001000);
+ cachingCount_ = 0;
+ bitField0_ = (bitField0_ & ~0x00002000);
+ prefetching_ = false;
+ bitField0_ = (bitField0_ & ~0x00004000);
return this;
}
@@ -11368,6 +11434,14 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000400;
}
result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
+ if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+ to_bitField0_ |= 0x00000800;
+ }
+ result.cachingCount_ = cachingCount_;
+ if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+ to_bitField0_ |= 0x00001000;
+ }
+ result.prefetching_ = prefetching_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -11469,6 +11543,12 @@ public final class ClientProtos {
if (other.hasLoadColumnFamiliesOnDemand()) {
setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
}
+ if (other.hasCachingCount()) {
+ setCachingCount(other.getCachingCount());
+ }
+ if (other.hasPrefetching()) {
+ setPrefetching(other.getPrefetching());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -11593,6 +11673,16 @@ public final class ClientProtos {
loadColumnFamiliesOnDemand_ = input.readBool();
break;
}
+ case 112: {
+ bitField0_ |= 0x00002000;
+ cachingCount_ = input.readUInt32();
+ break;
+ }
+ case 120: {
+ bitField0_ |= 0x00004000;
+ prefetching_ = input.readBool();
+ break;
+ }
}
}
}
@@ -12346,6 +12436,48 @@ public final class ClientProtos {
return this;
}
+ // optional uint32 cachingCount = 14;
+ private int cachingCount_ ;
+ public boolean hasCachingCount() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ public int getCachingCount() {
+ return cachingCount_;
+ }
+ public Builder setCachingCount(int value) {
+ bitField0_ |= 0x00002000;
+ cachingCount_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearCachingCount() {
+ bitField0_ = (bitField0_ & ~0x00002000);
+ cachingCount_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional bool prefetching = 15;
+ private boolean prefetching_ ;
+ public boolean hasPrefetching() {
+ return ((bitField0_ & 0x00004000) == 0x00004000);
+ }
+ public boolean getPrefetching() {
+ return prefetching_;
+ }
+ public Builder setPrefetching(boolean value) {
+ bitField0_ |= 0x00004000;
+ prefetching_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearPrefetching() {
+ bitField0_ = (bitField0_ & ~0x00004000);
+ prefetching_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Scan)
}
@@ -21467,7 +21599,7 @@ public final class ClientProtos {
"ation\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition" +
"\030\003 \001(\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006" +
"result\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010" +
- "\"\307\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" +
+ "\"\362\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" +
"tribute\030\002 \003(\0132\016.NameBytesPair\022\020\n\010startRo" +
"w\030\003 \001(\014\022\017\n\007stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\013" +
"2\007.Filter\022\035\n\ttimeRange\030\006 \001(\0132\n.TimeRange" +
@@ -21475,45 +21607,46 @@ public final class ClientProtos {
"\010 \001(\010:\004true\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxRes" +
"ultSize\030\n \001(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013sto" +
"reOffset\030\014 \001(\r\022\"\n\032loadColumnFamiliesOnDe" +
- "mand\030\r \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001" +
- "(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Sca" +
- "n\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001" +
- "(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallSeq\030" +
- "\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMeta\030" +
- "\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002 \001(" +
- "\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%\n\016R",
- "esultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001\n\024B" +
- "ulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.Re" +
- "gionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .Bulk" +
- "LoadHFileRequest.FamilyPath\022\024\n\014assignSeq" +
- "Num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022" +
- "\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016" +
- "\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceCall" +
- "\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n\nme" +
- "thodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Copro" +
- "cessorServiceRequest\022 \n\006region\030\001 \002(\0132\020.R",
- "egionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coprocess" +
- "orServiceCall\"]\n\032CoprocessorServiceRespo" +
- "nse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n" +
- "\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013MultiAct" +
- "ion\022 \n\010mutation\030\001 \001(\0132\016.MutationProto\022\021\n" +
- "\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005valu" +
- "e\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016.Na" +
- "meBytesPair\"^\n\014MultiRequest\022 \n\006region\030\001 " +
- "\002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\0132\014." +
- "MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResp",
- "onse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342\002\n\r" +
- "ClientService\022 \n\003get\022\013.GetRequest\032\014.GetR" +
- "esponse\022/\n\010multiGet\022\020.MultiGetRequest\032\021." +
- "MultiGetResponse\022)\n\006mutate\022\016.MutateReque" +
- "st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" +
- "t\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.Bulk" +
- "LoadHFileRequest\032\026.BulkLoadHFileResponse" +
- "\022F\n\013execService\022\032.CoprocessorServiceRequ" +
- "est\032\033.CoprocessorServiceResponse\022&\n\005mult" +
- "i\022\r.MultiRequest\032\016.MultiResponseBB\n*org.",
- "apache.hadoop.hbase.protobuf.generatedB\014" +
- "ClientProtosH\001\210\001\001\240\001\001"
+ "mand\030\r \001(\010\022\024\n\014cachingCount\030\016 \001(\r\022\023\n\013pref" +
+ "etching\030\017 \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030" +
+ "\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005." +
+ "Scan\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030" +
+ "\004 \001(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallS" +
+ "eq\030\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMe" +
+ "ta\030\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002",
+ " \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%" +
+ "\n\016ResultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001" +
+ "\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020" +
+ ".RegionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .B" +
+ "ulkLoadHFileRequest.FamilyPath\022\024\n\014assign" +
+ "SeqNum\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" +
+ "(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" +
+ "e\022\016\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceC" +
+ "all\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n" +
+ "\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Co",
+ "processorServiceRequest\022 \n\006region\030\001 \002(\0132" +
+ "\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coproc" +
+ "essorServiceCall\"]\n\032CoprocessorServiceRe" +
+ "sponse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
+ "\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013Multi" +
+ "Action\022 \n\010mutation\030\001 \001(\0132\016.MutationProto" +
+ "\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005v" +
+ "alue\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016" +
+ ".NameBytesPair\"^\n\014MultiRequest\022 \n\006region" +
+ "\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\013",
+ "2\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiR" +
+ "esponse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342" +
+ "\002\n\rClientService\022 \n\003get\022\013.GetRequest\032\014.G" +
+ "etResponse\022/\n\010multiGet\022\020.MultiGetRequest" +
+ "\032\021.MultiGetResponse\022)\n\006mutate\022\016.MutateRe" +
+ "quest\032\017.MutateResponse\022#\n\004scan\022\014.ScanReq" +
+ "uest\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.B" +
+ "ulkLoadHFileRequest\032\026.BulkLoadHFileRespo" +
+ "nse\022F\n\013execService\022\032.CoprocessorServiceR" +
+ "equest\032\033.CoprocessorServiceResponse\022&\n\005m",
+ "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
+ "rg.apache.hadoop.hbase.protobuf.generate" +
+ "dB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21629,7 +21762,7 @@ public final class ClientProtos {
internal_static_Scan_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Scan_descriptor,
- new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", },
+ new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "CachingCount", "Prefetching", },
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
internal_static_ScanRequest_descriptor =
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto Fri May 24 22:51:22 2013
@@ -233,6 +233,8 @@ message Scan {
optional uint32 storeLimit = 11;
optional uint32 storeOffset = 12;
optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */
+ optional uint32 cachingCount = 14;
+ optional bool prefetching = 15;
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri May 24 22:51:22 2013
@@ -185,6 +185,7 @@ import com.google.protobuf.Service;
* defines the keyspace for this HRegion.
*/
@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
public class HRegion implements HeapSize { // , Writable{
public static final Log LOG = LogFactory.getLog(HRegion.class);
@@ -3603,7 +3604,6 @@ public class HRegion implements HeapSize
return returnResult;
}
-
private void populateFromJoinedHeap(List<KeyValue> results, int limit)
throws IOException {
assert joinedContinuationRow != null;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri May 24 22:51:22 2013
@@ -45,6 +45,8 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
@@ -59,12 +61,12 @@ import org.apache.hadoop.hbase.CellScann
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HealthCheckChore;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@@ -164,8 +166,8 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -480,6 +482,11 @@ public class HRegionServer implements Cl
private TableLockManager tableLockManager;
/**
+ * Threadpool for doing scanner prefetches
+ */
+ protected ThreadPoolExecutor scanPrefetchThreadPool;
+
+ /**
* Starts a HRegionServer at the default location
*
* @param conf
@@ -616,14 +623,18 @@ public class HRegionServer implements Cl
}
RegionScanner getScanner(long scannerId) {
- String scannerIdString = Long.toString(scannerId);
- RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
+ RegionScannerHolder scannerHolder = getScannerHolder(scannerId);
if (scannerHolder != null) {
- return scannerHolder.s;
+ return scannerHolder.scanner;
}
return null;
}
+ public RegionScannerHolder getScannerHolder(long scannerId) {
+ String scannerIdString = Long.toString(scannerId);
+ return scanners.get(scannerIdString);
+ }
+
/**
* All initialization needed before we go register with Master.
*
@@ -837,6 +848,11 @@ public class HRegionServer implements Cl
if (this.thriftServer != null) this.thriftServer.shutdown();
this.leases.closeAfterLeasesExpire();
this.rpcServer.stop();
+
+ if (scanPrefetchThreadPool != null) {
+ // shutdown the prefetch threads
+ scanPrefetchThreadPool.shutdownNow();
+ }
if (this.splitLogWorker != null) {
splitLogWorker.stop();
}
@@ -1107,7 +1123,7 @@ public class HRegionServer implements Cl
// exception next time they come in.
for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
try {
- e.getValue().s.close();
+ e.getValue().closeScanner();
} catch (IOException ioe) {
LOG.warn("Closing scanner " + e.getKey(), ioe);
}
@@ -1537,6 +1553,14 @@ public class HRegionServer implements Cl
this.replicationSinkHandler.startReplicationService();
}
+ // start the scanner prefetch threadpool
+ int numHandlers = conf.getInt("hbase.regionserver.prefetcher.threads.max",
+ conf.getInt("hbase.regionserver.handler.count", 10)
+ + conf.getInt("hbase.regionserver.metahandler.count", 10));
+ scanPrefetchThreadPool =
+ Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS,
+ new DaemonThreadFactory(RegionScannerHolder.PREFETCHER_THREAD_PREFIX));
+
// Start Server. This service is like leases in that it internally runs
// a thread.
this.rpcServer.start();
@@ -1831,8 +1855,7 @@ public class HRegionServer implements Cl
continue;
}
- InetSocketAddress isa =
- new InetSocketAddress(sn.getHostname(), sn.getPort());
+ new InetSocketAddress(sn.getHostname(), sn.getPort());
LOG.info("Attempting connect to Master server at " +
this.masterAddressManager.getMasterAddress());
@@ -2325,7 +2348,7 @@ public class HRegionServer implements Cl
public void leaseExpired() {
RegionScannerHolder rsh = scanners.remove(this.scannerName);
if (rsh != null) {
- RegionScanner s = rsh.s;
+ RegionScanner s = rsh.scanner;
LOG.info("Scanner " + this.scannerName + " lease expired on region "
+ s.getRegionInfo().getRegionNameAsString());
try {
@@ -2334,7 +2357,7 @@ public class HRegionServer implements Cl
region.getCoprocessorHost().preScannerClose(s);
}
- s.close();
+ rsh.closeScanner();
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(s);
}
@@ -2638,20 +2661,22 @@ public class HRegionServer implements Cl
return this.fsOk;
}
- protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+ protected RegionScannerHolder addScanner(
+ RegionScanner s, HRegion r) throws LeaseStillHeldException {
+ RegionScannerHolder holder = new RegionScannerHolder(this, s, r);
+ String scannerName = null;
long scannerId = -1;
while (true) {
- scannerId = rand.nextLong();
- if (scannerId == -1) continue;
- String scannerName = String.valueOf(scannerId);
- RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
+ scannerId = nextLong();
+ scannerName = String.valueOf(scannerId);
+ RegionScannerHolder existing = scanners.putIfAbsent(scannerName, holder);
if (existing == null) {
+ holder.scannerName = scannerName;
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
- new ScannerListener(scannerName));
- break;
+ new ScannerListener(scannerName));
+ return holder;
}
}
- return scannerId;
}
/**
@@ -2913,7 +2938,6 @@ public class HRegionServer implements Cl
@Override
public ScanResponse scan(final RpcController controller,
final ScanRequest request) throws ServiceException {
- Leases.Lease lease = null;
String scannerName = null;
try {
if (!request.hasScannerId() && !request.hasScan()) {
@@ -2963,7 +2987,10 @@ public class HRegionServer implements Cl
throw new UnknownScannerException(
"Name: " + scannerName + ", already closed?");
}
- scanner = rsh.s;
+ scanner = rsh.scanner;
+ // Use the region found in the online region list,
+ // not that one in the RegionScannerHolder. So that we can
+ // make sure the region is still open in this region server.
region = getRegion(scanner.getRegionInfo().getRegionName());
} else {
region = getRegion(request.getRegion());
@@ -2974,7 +3001,6 @@ public class HRegionServer implements Cl
if (!isLoadingCfsOnDemandSet) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}
- byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
region.prepareScanner(scan);
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().preScannerOpen(scan);
@@ -2985,9 +3011,14 @@ public class HRegionServer implements Cl
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
}
- scannerId = addScanner(scanner);
- scannerName = String.valueOf(scannerId);
+ rsh = addScanner(scanner, region);
+ scannerName = rsh.scannerName;
+ scannerId = Long.parseLong(scannerName);
+
ttl = this.scannerLeaseTimeoutPeriod;
+ if (scan.getPrefetching()) {
+ rsh.enablePrefetching(scan.getCaching());
+ }
}
if (rows > 0) {
@@ -2995,110 +3026,34 @@ public class HRegionServer implements Cl
// performed even before checking of Lease.
// See HBASE-5974
if (request.hasNextCallSeq()) {
- if (rsh == null) {
- rsh = scanners.get(scannerName);
- }
- if (rsh != null) {
- if (request.getNextCallSeq() != rsh.nextCallSeq) {
- throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
- + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
- "; request=" + TextFormat.shortDebugString(request));
- }
- // Increment the nextCallSeq value which is the next expected from client.
- rsh.nextCallSeq++;
+ if (request.getNextCallSeq() != rsh.nextCallSeq) {
+ throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+ + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
+ "; request=" + TextFormat.shortDebugString(request));
}
+ // Increment the nextCallSeq value which is the next expected from client.
+ rsh.nextCallSeq++;
}
- try {
- // Remove lease while its being processed in server; protects against case
- // where processing of request takes > lease expiration time.
- lease = leases.removeLease(scannerName);
- List<Result> results = new ArrayList<Result>(rows);
- long currentScanResultSize = 0;
-
- boolean done = false;
- // Call coprocessor. Get region info from scanner.
- if (region != null && region.getCoprocessorHost() != null) {
- Boolean bypass = region.getCoprocessorHost().preScannerNext(
- scanner, results, rows);
- if (!results.isEmpty()) {
- for (Result r : results) {
- if (maxScannerResultSize < Long.MAX_VALUE){
- for (KeyValue kv : r.raw()) {
- currentScanResultSize += kv.heapSize();
- }
- }
- }
- }
- if (bypass != null && bypass.booleanValue()) {
- done = true;
- }
- }
- if (!done) {
- long maxResultSize = scanner.getMaxResultSize();
- if (maxResultSize <= 0) {
- maxResultSize = maxScannerResultSize;
- }
- List<KeyValue> values = new ArrayList<KeyValue>();
- MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
- region.startRegionOperation(Operation.SCAN);
- try {
- int i = 0;
- synchronized(scanner) {
- for (; i < rows
- && currentScanResultSize < maxResultSize; i++) {
- // Collect values to be returned here
- boolean moreRows = scanner.nextRaw(values);
- if (!values.isEmpty()) {
- if (maxScannerResultSize < Long.MAX_VALUE){
- for (KeyValue kv : values) {
- currentScanResultSize += kv.heapSize();
- }
- }
- results.add(new Result(values));
- }
- if (!moreRows) {
- break;
- }
- values.clear();
- }
- }
- region.readRequestsCount.add(i);
- } finally {
- region.closeRegionOperation();
- }
-
- // coprocessor postNext hook
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
- }
- }
+ ttl = this.scannerLeaseTimeoutPeriod;
+ ScanResult result = rsh.getScanResult(rows);
+ if (result.isException) {
+ throw result.ioException;
+ }
- // If the scanner's filter - if any - is done with the scan
- // and wants to tell the client to stop the scan. This is done by passing
- // a null result, and setting moreResults to false.
- if (scanner.isFilterDone() && results.isEmpty()) {
- moreResults = false;
- results = null;
- } else {
- ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
- List<CellScannable> cellScannables = new ArrayList<CellScannable>(results.size());
- for (Result res : results) {
- cellScannables.add(res);
- rcmBuilder.addCellsLength(res.size());
- }
- builder.setResultCellMeta(rcmBuilder.build());
- // TODO is this okey to assume the type and cast
- ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
- .createCellScanner(cellScannables));
- }
- } finally {
- // We're done. On way out re-add the above removed lease.
- // Adding resets expiration time on lease.
- if (scanners.containsKey(scannerName)) {
- if (lease != null) leases.addLease(lease);
- ttl = this.scannerLeaseTimeoutPeriod;
+ moreResults = result.moreResults;
+ if (result.results != null) {
+ List<CellScannable> cellScannables =
+ new ArrayList<CellScannable>(result.results.size());
+ ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
+ for (Result res : result.results) {
+ cellScannables.add(res);
+ rcmBuilder.addCellsLength(res.size());
}
+ builder.setResultCellMeta(rcmBuilder.build());
+ // TODO is this okey to assume the type and cast
+ ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
+ .createCellScanner(cellScannables));
}
}
@@ -3112,9 +3067,13 @@ public class HRegionServer implements Cl
}
rsh = scanners.remove(scannerName);
if (rsh != null) {
- scanner = rsh.s;
- scanner.close();
- leases.cancelLease(scannerName);
+ rsh.closeScanner();
+ try {
+ leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ // That's ok, since the lease may be gone with
+ // the prefetcher when cancelled.
+ }
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
@@ -4181,18 +4140,6 @@ public class HRegionServer implements Cl
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
}
- /**
- * Holder class which holds the RegionScanner and nextCallSeq together.
- */
- private static class RegionScannerHolder {
- private RegionScanner s;
- private long nextCallSeq = 0L;
-
- public RegionScannerHolder(RegionScanner s) {
- this.s = s;
- }
- }
-
private boolean isHealthCheckerConfigured() {
String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java?rev=1486246&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java Fri May 24 22:51:22 2013
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Holder class which holds the RegionScanner, nextCallSeq, ScanPrefetcher
+ * and information needed for prefetcher/fetcher.
+ *
+ * Originally, this is an inner class of HRegionServer. We moved it out
+ * since HRegionServer is getting bigger and bigger.
+ */
+@InterfaceAudience.Private
+public class RegionScannerHolder {
+ public final static String MAX_PREFETCHED_RESULT_SIZE_KEY
+ = "hbase.hregionserver.prefetcher.resultsize.max";
+ public final static int MAX_PREFETCHED_RESULT_SIZE_DEFAULT = 256 * 1024 * 1024;
+
+ final static Log LOG = LogFactory.getLog(RegionScannerHolder.class);
+ final static String PREFETCHER_THREAD_PREFIX = "scan-prefetch-";
+
+ private final static AtomicLong globalPrefetchedResultSize = new AtomicLong();
+
+ private ThreadPoolExecutor scanPrefetchThreadPool;
+ private Map<String, RegionScannerHolder> scanners;
+ private long maxScannerResultSize;
+ private Configuration conf;
+ private Leases leases;
+
+ private boolean prefetching = false;
+ private long maxGlobalPrefetchedResultSize;
+ private volatile Future<ScanResult> prefetchScanFuture;
+ private volatile long prefetchedResultSize;
+ private ScanPrefetcher prefetcher;
+ private HRegion region;
+ private int rows;
+
+ RegionScanner scanner;
+ long nextCallSeq = 0L;
+ String scannerName;
+
+ /**
+ * Get the total size of all prefetched results not retrieved yet.
+ */
+ public static long getPrefetchedResultSize() {
+ return globalPrefetchedResultSize.get();
+ }
+
+ /**
+ * Construct a RegionScanner holder for a specific region server.
+ *
+ * @param rs the region server the specific region is on
+ * @param s the scanner to be held
+ * @param r the region the scanner is for
+ */
+ RegionScannerHolder(HRegionServer rs, RegionScanner s, HRegion r) {
+ scanPrefetchThreadPool = rs.scanPrefetchThreadPool;
+ maxScannerResultSize = rs.maxScannerResultSize;
+ prefetcher = new ScanPrefetcher();
+ scanners = rs.scanners;
+ leases = rs.leases;
+ conf = rs.conf;
+ scanner = s;
+ region = r;
+ }
+
+ public boolean isPrefetchSubmitted() {
+ return prefetchScanFuture != null;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ /**
+ * Find the current prefetched result size
+ */
+ public long currentPrefetchedResultSize() {
+ return prefetchedResultSize;
+ }
+
+ /**
+ * Wait till current prefetching task complete,
+ * return true if any data retrieved, false otherwise.
+ * Used for unit testing only.
+ */
+ public boolean waitForPrefetchingDone() {
+ if (prefetchScanFuture != null) {
+ try {
+ ScanResult scanResult = prefetchScanFuture.get();
+ return scanResult != null && scanResult.results != null
+ && !scanResult.results.isEmpty();
+ } catch (Throwable t) {
+ LOG.debug("Got exception in getting scan result", t);
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Stop any prefetching task and close the scanner.
+ * @throws IOException
+ */
+ public void closeScanner() throws IOException {
+ // stop prefetcher if needed.
+ if (prefetchScanFuture != null) {
+ synchronized (prefetcher) {
+ prefetcher.scannerClosing = true;
+ prefetchScanFuture.cancel(false);
+ }
+ prefetchScanFuture = null;
+ if (prefetchedResultSize > 0) {
+ globalPrefetchedResultSize.addAndGet(-prefetchedResultSize);
+ prefetchedResultSize = 0L;
+ }
+ }
+ scanner.close();
+ }
+
+ /**
+ * Get the prefetched scan result, if any. Otherwise,
+ * do a scan synchronously and return the result, which
+ * may take some time. Region scan coprocessor, if specified,
+ * is invoked properly, which may override the scan result.
+ *
+ * @param rows the number of rows to scan, which is preferred
+ * not to change among scanner.next() calls.
+ *
+ * @return scan result, which has the data retrieved from
+ * the scanner, or some IOException if the scan failed.
+ * @throws IOException if failed to retrieve from the scanner.
+ */
+ public ScanResult getScanResult(final int rows) throws IOException {
+ Preconditions.checkArgument(rows > 0, "Number of rows requested must be positive");
+ ScanResult scanResult = null;
+ this.rows = rows;
+
+ if (prefetchScanFuture == null) {
+ // Need to scan inline if not prefetched
+ scanResult = prefetcher.call();
+ } else {
+ // if we have a prefetched result, then use it
+ try {
+ scanResult = prefetchScanFuture.get();
+ if (scanResult.moreResults) {
+ int prefetchedRows = scanResult.results.size();
+ if (prefetchedRows != 0 && this.rows > prefetchedRows) {
+ // Try to scan more since we haven't prefetched enough
+ this.rows -= prefetchedRows;
+ ScanResult tmp = prefetcher.call();
+ if (tmp.isException) {
+ return tmp; // Keep the prefetched results for later
+ }
+ if (tmp.results != null && !tmp.results.isEmpty()) {
+ // Merge new results to the old result list
+ scanResult.results.addAll(tmp.results);
+ }
+ // Reset rows for next prefetching
+ this.rows = rows;
+ }
+ }
+ prefetchScanFuture = null;
+ if (prefetchedResultSize > 0) {
+ globalPrefetchedResultSize.addAndGet(-prefetchedResultSize);
+ prefetchedResultSize = 0L;
+ }
+ } catch (ExecutionException ee) {
+ throw new IOException("failed to run prefetching task", ee.getCause());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ IOException iie = new InterruptedIOException("scan was interrupted");
+ iie.initCause(ie);
+ throw iie;
+ }
+ }
+
+ if (prefetching
+ && scanResult.moreResults && !scanResult.results.isEmpty()) {
+ long totalPrefetchedResultSize = globalPrefetchedResultSize.get();
+ if (totalPrefetchedResultSize < maxGlobalPrefetchedResultSize) {
+ // Schedule a background prefetch for the next result
+ // if prefetch is enabled on scans and there are more results
+ prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher);
+ } else if (LOG.isTraceEnabled()) {
+ LOG.trace("One prefetching is skipped for scanner " + scannerName
+ + " since total prefetched result size " + totalPrefetchedResultSize
+ + " is more than the maximum configured "
+ + maxGlobalPrefetchedResultSize);
+ }
+ }
+ return scanResult;
+ }
+
+ /**
+ * Set the rows to prefetch, and start the prefetching task.
+ */
+ public void enablePrefetching(int caching) {
+ if (caching > 0) {
+ rows = caching;
+ } else {
+ rows = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ }
+ maxGlobalPrefetchedResultSize = conf.getLong(
+ MAX_PREFETCHED_RESULT_SIZE_KEY, MAX_PREFETCHED_RESULT_SIZE_DEFAULT);
+ if (globalPrefetchedResultSize.get() < maxGlobalPrefetchedResultSize) {
+ prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher);
+ }
+ prefetching = true;
+ }
+
+ /**
+ * This Callable abstracts calling a pre-fetch next. This is called on a
+ * threadpool. It makes a pre-fetch next call with the same parameters as
+ * the incoming next call. Note that the number of rows to return (nbRows)
+ * and/or the memory size for the result is the same as the previous call if
+ * pre-fetching is enabled. If these parameters change dynamically,
+ * they will take effect in the subsequent iteration.
+ */
+ class ScanPrefetcher implements Callable<ScanResult> {
+ boolean scannerClosing = false;
+
+ public ScanResult call() {
+ ScanResult scanResult = null;
+ Leases.Lease lease = null;
+ try {
+ // Remove lease while its being processed in server; protects against case
+ // where processing of request takes > lease expiration time.
+ lease = leases.removeLease(scannerName);
+ List<Result> results = new ArrayList<Result>(rows);
+ long currentScanResultSize = 0;
+ boolean moreResults = true;
+
+ boolean done = false;
+ long maxResultSize = scanner.getMaxResultSize();
+ if (maxResultSize <= 0) {
+ maxResultSize = maxScannerResultSize;
+ }
+ String threadName = Thread.currentThread().getName();
+ boolean prefetchingThread = threadName.startsWith(PREFETCHER_THREAD_PREFIX);
+ // Call coprocessor. Get region info from scanner.
+ if (region != null && region.getCoprocessorHost() != null) {
+ Boolean bypass = region.getCoprocessorHost().preScannerNext(
+ scanner, results, rows);
+ if (!results.isEmpty()
+ && (prefetchingThread || maxResultSize < Long.MAX_VALUE)) {
+ for (Result r : results) {
+ for (KeyValue kv : r.raw()) {
+ currentScanResultSize += kv.heapSize();
+ }
+ }
+ }
+ if (bypass != null && bypass.booleanValue()) {
+ done = true;
+ }
+ }
+
+ if (!done) {
+ List<KeyValue> values = new ArrayList<KeyValue>();
+ MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+ region.startRegionOperation();
+ try {
+ int i = 0;
+ synchronized(scanner) {
+ for (; i < rows
+ && currentScanResultSize < maxResultSize; i++) {
+ // Collect values to be returned here
+ boolean moreRows = scanner.nextRaw(values);
+ if (!values.isEmpty()) {
+ if (prefetchingThread || maxResultSize < Long.MAX_VALUE){
+ for (KeyValue kv : values) {
+ currentScanResultSize += kv.heapSize();
+ }
+ }
+ results.add(new Result(values));
+ }
+ if (!moreRows) {
+ break;
+ }
+ values.clear();
+ }
+ }
+ region.readRequestsCount.add(i);
+ } finally {
+ region.closeRegionOperation();
+ }
+
+ // coprocessor postNext hook
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+ }
+ }
+
+ // If the scanner's filter - if any - is done with the scan
+ // and wants to tell the client to stop the scan. This is done by passing
+ // a null result, and setting moreResults to false.
+ if (scanner.isFilterDone() && results.isEmpty()) {
+ moreResults = false;
+ results = null;
+ }
+ scanResult = new ScanResult(moreResults, results);
+ if (prefetchingThread && currentScanResultSize > 0) {
+ synchronized (prefetcher) {
+ if (!scannerClosing) {
+ globalPrefetchedResultSize.addAndGet(currentScanResultSize);
+ prefetchedResultSize = currentScanResultSize;
+ }
+ }
+ }
+ } catch (IOException e) {
+ // we should queue the exception as the result so that we can return
+ // this when the result is asked for
+ scanResult = new ScanResult(e);
+ } finally {
+ // We're done. On way out re-add the above removed lease.
+ // Adding resets expiration time on lease.
+ if (scanners.containsKey(scannerName)) {
+ if (lease != null) {
+ try {
+ leases.addLease(lease);
+ } catch (LeaseStillHeldException e) {
+ LOG.error("THIS SHOULD NOT HAPPEN", e);
+ }
+ }
+ }
+ }
+ return scanResult;
+ }
+ }
+}
+
+/**
+ * This class abstracts the results of a single scanner's result. It tracks
+ * the list of Result objects if the pre-fetch next was successful, and
+ * tracks the exception if the next failed.
+ */
+class ScanResult {
+ final boolean isException;
+ IOException ioException = null;
+
+ List<Result> results = null;
+ boolean moreResults = false;
+
+ public ScanResult(IOException ioException) {
+ this.ioException = ioException;
+ isException = true;
+ }
+
+ public ScanResult(boolean moreResults, List<Result> results) {
+ this.moreResults = moreResults;
+ this.results = results;
+ isException = false;
+ }
+}
Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java Fri May 24 22:51:22 2013
@@ -17,31 +17,41 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScannerHolder;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/**
* A client-side test, mostly testing scanners with various parameters.
*/
@Category(MediumTests.class)
+@RunWith(Parameterized.class)
public class TestScannersFromClientSide {
private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class);
@@ -51,6 +61,37 @@ public class TestScannersFromClientSide
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
+ private final boolean prefetching;
+ private long maxSize;
+
+ @Parameters
+ public static final Collection<Object[]> parameters() {
+ List<Object[]> prefetchings = new ArrayList<Object[]>();
+ prefetchings.add(new Object[] {Long.valueOf(-1)});
+ prefetchings.add(new Object[] {Long.valueOf(0)});
+ prefetchings.add(new Object[] {Long.valueOf(1)});
+ prefetchings.add(new Object[] {Long.valueOf(1024)});
+ return prefetchings;
+ }
+
+ public TestScannersFromClientSide(Long maxPrefetchedResultSize) {
+ this.maxSize = maxPrefetchedResultSize.longValue();
+ if (this.maxSize < 0) {
+ this.prefetching = false;
+ } else {
+ this.prefetching = true;
+ if (this.maxSize == 0) {
+ this.maxSize = RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_DEFAULT;
+ } else {
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
+ Configuration conf = rst.getRegionServer().getConfiguration();
+ conf.setLong(RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_KEY, maxSize);
+ }
+ }
+ }
+ }
+
/**
* @throws java.lang.Exception
*/
@@ -65,22 +106,9 @@ public class TestScannersFromClientSide
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
- }
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void setUp() throws Exception {
- // Nothing to do.
- }
-
- /**
- * @throws java.lang.Exception
- */
- @After
- public void tearDown() throws Exception {
- // Nothing to do.
+ long remainingPrefetchedSize = RegionScannerHolder.getPrefetchedResultSize();
+ assertEquals("All prefetched results should be gone",
+ 0, remainingPrefetchedSize);
}
/**
@@ -89,8 +117,23 @@ public class TestScannersFromClientSide
* @throws Exception
*/
@Test
+ public void testScanBatchWithDefaultCaching() throws Exception {
+ batchedScanWithCachingSpecified(-1); // Using default caching which is 100
+ }
+
+ /**
+ * Test from client side for batch of scan
+ *
+ * @throws Exception
+ */
+ @Test
public void testScanBatch() throws Exception {
- byte [] TABLE = Bytes.toBytes("testScanBatch");
+ batchedScanWithCachingSpecified(1);
+ }
+
+ private void batchedScanWithCachingSpecified(int caching) throws Exception {
+ byte [] TABLE = Bytes.toBytes(
+ "testScanBatch-" + prefetching + "_" + maxSize + "_" + caching);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
@@ -99,7 +142,7 @@ public class TestScannersFromClientSide
Scan scan;
Delete delete;
Result result;
- ResultScanner scanner;
+ ClientScanner scanner;
boolean toLog = true;
List<KeyValue> kvListExp;
@@ -124,8 +167,11 @@ public class TestScannersFromClientSide
// without batch
scan = new Scan(ROW);
+ scan.setCaching(caching);
scan.setMaxVersions();
- scanner = ht.getScanner(scan);
+ scan.setPrefetching(prefetching);
+ scanner = (ClientScanner)ht.getScanner(scan);
+ verifyPrefetching(scanner);
// c4:4, c5:5, c6:6, c7:7
kvListExp = new ArrayList<KeyValue>();
@@ -135,12 +181,16 @@ public class TestScannersFromClientSide
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
result = scanner.next();
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
+ verifyPrefetching(scanner);
// with batch
scan = new Scan(ROW);
+ scan.setCaching(caching);
scan.setMaxVersions();
scan.setBatch(2);
- scanner = ht.getScanner(scan);
+ scan.setPrefetching(prefetching);
+ scanner = (ClientScanner)ht.getScanner(scan);
+ verifyPrefetching(scanner);
// First batch: c4:4, c5:5
kvListExp = new ArrayList<KeyValue>();
@@ -148,6 +198,7 @@ public class TestScannersFromClientSide
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
result = scanner.next();
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
+ verifyPrefetching(scanner);
// Second batch: c6:6, c7:7
kvListExp = new ArrayList<KeyValue>();
@@ -155,7 +206,7 @@ public class TestScannersFromClientSide
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
result = scanner.next();
verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
-
+ verifyPrefetching(scanner);
}
/**
@@ -165,7 +216,7 @@ public class TestScannersFromClientSide
*/
@Test
public void testGetMaxResults() throws Exception {
- byte [] TABLE = Bytes.toBytes("testGetMaxResults");
+ byte [] TABLE = Bytes.toBytes("testGetMaxResults-" + prefetching + "_" + maxSize);
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@@ -285,7 +336,7 @@ public class TestScannersFromClientSide
*/
@Test
public void testScanMaxResults() throws Exception {
- byte [] TABLE = Bytes.toBytes("testScanLimit");
+ byte [] TABLE = Bytes.toBytes("testScanLimit-" + prefetching + "_" + maxSize);
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
@@ -315,17 +366,19 @@ public class TestScannersFromClientSide
}
scan = new Scan();
+ scan.setCaching(1);
+ scan.setPrefetching(prefetching);
scan.setMaxResultsPerColumnFamily(4);
- ResultScanner scanner = ht.getScanner(scan);
+ ClientScanner scanner = (ClientScanner)ht.getScanner(scan);
kvListScan = new ArrayList<KeyValue>();
while ((result = scanner.next()) != null) {
+ verifyPrefetching(scanner);
for (KeyValue kv : result.list()) {
kvListScan.add(kv);
}
}
result = new Result(kvListScan);
verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
-
}
/**
@@ -335,7 +388,7 @@ public class TestScannersFromClientSide
*/
@Test
public void testGetRowOffset() throws Exception {
- byte [] TABLE = Bytes.toBytes("testGetRowOffset");
+ byte [] TABLE = Bytes.toBytes("testGetRowOffset-" + prefetching + "_" + maxSize);
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@@ -421,7 +474,47 @@ public class TestScannersFromClientSide
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
verifyResult(result, kvListExp, toLog,
"Testing offset + multiple CFs + maxResults");
+ }
+ /**
+ * For testing only, find a region scanner holder for a scan.
+ */
+ RegionScannerHolder findRegionScannerHolder(ClientScanner scanner) {
+ long scannerId = scanner.currentScannerId();
+ if (scannerId == -1L) return null;
+
+ HRegionInfo expectedRegion = scanner.currentRegionInfo();
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
+ RegionScannerHolder rsh = rst.getRegionServer().getScannerHolder(scannerId);
+ if (rsh != null && rsh.getRegionInfo().equals(expectedRegion)) {
+ return rsh;
+ }
+ }
+ return null;
+ }
+
+ void verifyPrefetching(ClientScanner scanner) throws IOException {
+ long scannerId = scanner.currentScannerId();
+ if (scannerId == -1L) return; // scanner is already closed
+ RegionScannerHolder rsh = findRegionScannerHolder(scanner);
+ assertNotNull("We should be able to find the scanner", rsh);
+ boolean isPrefetchSubmitted = rsh.isPrefetchSubmitted();
+ if (prefetching && (RegionScannerHolder.getPrefetchedResultSize() < this.maxSize)) {
+ assertTrue("Prefetching should be submitted or no more result",
+ isPrefetchSubmitted || scanner.next() == null);
+ } else if (isPrefetchSubmitted) {
+ // Prefetch submitted, it must be because prefetching is enabled,
+ // and there was still room before it's scheduled
+ long sizeBefore = RegionScannerHolder.getPrefetchedResultSize()
+ - rsh.currentPrefetchedResultSize();
+ assertTrue("There should have room before prefetching is submitted",
+ prefetching && sizeBefore < this.maxSize);
+ }
+ if (isPrefetchSubmitted && rsh.waitForPrefetchingDone()) {
+ assertTrue("Prefetched result size should not be 0",
+ rsh.currentPrefetchedResultSize() > 0);
+ }
}
static void verifyResult(Result result, List<KeyValue> expKvList, boolean toLog,
@@ -449,6 +542,4 @@ public class TestScannersFromClientSide
assertEquals(expKvList.size(), result.size());
}
-
-
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java Fri May 24 22:51:22 2013
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@@ -130,6 +129,7 @@ public class TestRowProcessorEndpoint {
// ignore table not found
}
table = util.createTable(TABLE, FAM);
+ table.setAutoFlush(false);
{
Put put = new Put(ROW);
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
@@ -143,6 +143,8 @@ public class TestRowProcessorEndpoint {
put.add(FAM, F, G);
table.put(put);
row2Size = put.size();
+ table.clearRegionCache();
+ table.flushCommits();
}
@Test
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java Fri May 24 22:51:22 2013
@@ -284,6 +284,7 @@ public class TestProtobufUtil {
scanBuilder = ClientProtos.Scan.newBuilder(proto);
scanBuilder.setMaxVersions(1);
scanBuilder.setCacheBlocks(true);
+ scanBuilder.setPrefetching(true);
Scan scan = ProtobufUtil.toScan(proto);
assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan));
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java Fri May 24 22:51:22 2013
@@ -335,6 +335,7 @@ public class TestRegionServerMetrics {
Scan s = new Scan();
s.setBatch(1);
s.setCaching(1);
+ s.setPrefetching(false);
ResultScanner resultScanners = t.getScanner(s);
for (int nextCount = 0; nextCount < 30; nextCount++) {