You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/12/04 00:26:13 UTC
svn commit: r1416727 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/util/
test/java/org/apache/hadoop/hbase/benchmarks/ test/java/org/apache/...
Author: liyin
Date: Mon Dec 3 23:26:11 2012
New Revision: 1416727
URL: http://svn.apache.org/viewvc?rev=1416727&view=rev
Log:
[HBASE-6874] Implement prefetching for scanners
Author: kranganathan
Summary: Every time a scan makes a next call, the previously fetched value (if any) is returned and the subsequent fetch is issued in a background thread. This helps increase the scan throughput.
Test Plan: Tested on a single server setup, seems to increase throughput from 26MB/s to 39MB/s (see https://our.intern.facebook.com/intern/wiki/index.php/HBase/PerfExperiments/HBase-Scans)
Reviewers: kannan, aaiyer, liyintang
Reviewed By: liyintang
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D583120
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Mon Dec 3 23:26:11 2012
@@ -88,7 +88,8 @@ public class Scan extends Operation impl
private static final byte STORE_OFFSET_VERSION = (byte)3;
private static final byte RESPONSE_SIZE_VERSION = (byte)4;
private static final byte FLASHBACK_VERSION = (byte) 5;
- private static final byte SCAN_VERSION = FLASHBACK_VERSION;
+ private static final byte PREFETCH_VERSION = (byte) 6;
+ private static final byte SCAN_VERSION = PREFETCH_VERSION;
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
@@ -97,6 +98,7 @@ public class Scan extends Operation impl
private int storeLimit = -1;
private int storeOffset = 0;
private int caching = -1;
+ private boolean serverPrefetching = false;
private int maxResponseSize = HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE;
private int currentPartialResponseSize = 0;
private boolean partialRow = false;
@@ -152,6 +154,9 @@ public class Scan extends Operation impl
storeLimit = scan.getMaxResultsPerColumnFamily();
storeOffset = scan.getRowOffsetPerColumnFamily();
caching = scan.getCaching();
+ serverPrefetching = scan.getServerPrefetching();
+ maxResponseSize = scan.getMaxResponseSize();
+ partialRow = scan.isPartialRow();
cacheBlocks = scan.getCacheBlocks();
filter = scan.getFilter(); // clone?
TimeRange ctr = scan.getTimeRange();
@@ -168,6 +173,7 @@ public class Scan extends Operation impl
addFamily(fam);
}
}
+ effectiveTS = scan.getEffectiveTS();
}
/**
@@ -371,6 +377,20 @@ public class Scan extends Operation impl
}
/**
+ * Set if pre-fetching is enabled on the region server. If enabled, the
+ * region server will try to read the next scan result ahead of time. This
+ * improves scan performance if we are doing large scans.
+ * @param enablePrefetching if pre-fetching is enabled or not
+ */
+ public void setServerPrefetching(boolean enablePrefetching) {
+ this.serverPrefetching = enablePrefetching;
+ }
+
+ public boolean getServerPrefetching() {
+ return serverPrefetching;
+ }
+
+ /**
* @return maximum response size that client can handle for a single call to next()
*/
public int getMaxResponseSize() {
@@ -675,6 +695,9 @@ public class Scan extends Operation impl
if (version >= FLASHBACK_VERSION) {
effectiveTS = in.readLong();
}
+ if (version >= PREFETCH_VERSION) {
+ serverPrefetching = in.readBoolean();
+ }
this.caching = in.readInt();
this.cacheBlocks = in.readBoolean();
if(in.readBoolean()) {
@@ -703,7 +726,9 @@ public class Scan extends Operation impl
// We try to talk a protocol version as low as possible so that we can be
// backward compatible as far as possible.
byte version = (byte) 1;
- if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
+ if (serverPrefetching) {
+ version = PREFETCH_VERSION;
+ } else if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
version = FLASHBACK_VERSION;
} else if (this.maxResponseSize
!= HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) {
@@ -732,6 +757,9 @@ public class Scan extends Operation impl
if (version >= FLASHBACK_VERSION) {
out.writeLong(effectiveTS);
}
+ if (version >= PREFETCH_VERSION) {
+ out.writeBoolean(serverPrefetching);
+ }
out.writeInt(this.caching);
out.writeBoolean(this.cacheBlocks);
if(this.filter == null) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Dec 3 23:26:11 2012
@@ -91,12 +91,15 @@ import org.apache.hadoop.hbase.ipc.HRegi
import org.apache.hadoop.hbase.metrics.RequestMetrics;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanPrefetcher;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanResult;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@@ -526,7 +529,7 @@ public class HRegion implements HeapSize
this.waitOnMemstoreBlock =
conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
-
+
this.readRequests =new RequestMetrics();
this.writeRequests =new RequestMetrics();
}
@@ -3028,6 +3031,7 @@ public class HRegion implements HeapSize
private boolean filterClosed = false;
private long readPt;
private Scan originalScan;
+ private Future<ScanResult> prefetchScanFuture = null;
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.<init>");
@@ -3102,6 +3106,93 @@ public class HRegion implements HeapSize
}
/**
+ * This class abstracts the results of a single scanner's result. It tracks
+ * the list of Result objects if the pre-fetch next was successful, and
+ * tracks the exception if the next failed.
+ */
+ class ScanResult {
+ final boolean isException;
+ IOException ioException = null;
+ Result[] outResults;
+ boolean moreRows;
+
+ public ScanResult(IOException ioException) {
+ isException = true;
+ this.ioException = ioException;
+ }
+
+ public ScanResult(boolean moreRows, Result[] outResults) {
+ isException = false;
+ this.moreRows = moreRows;
+ this.outResults = outResults;
+ }
+ }
+
+ /**
+ * This Callable abstracts calling a pre-fetch next. This is called on a
+ * threadpool. It makes a pre-fetch next call with the same parameters as
+ * the incoming next call. Note that the number of rows to return (nbRows)
+ * and/or the memory size for the result is the same as the previous call if
+ * pre-fetching is enabled. If these params change dynamically, they will
+ * take effect in the subsequent iteration.
+ */
+ class ScanPrefetcher implements Callable<ScanResult> {
+ int nbRows;
+ int limit;
+ String metric;
+
+ ScanPrefetcher(int nbRows, int limit, String metric) {
+ this.nbRows = nbRows;
+ this.limit = limit;
+ this.metric = metric;
+ }
+
+ public ScanResult call() {
+ ScanResult scanResult = null;
+ List<Result> outResults = new ArrayList<Result>();
+ List<KeyValue> tmpList = new ArrayList<KeyValue>();
+ int currentNbRows = 0;
+ boolean moreRows = true;
+ try {
+ // This is necessary b/c partialResponseSize is not serialized through
+ // RPC
+ getOriginalScan().setCurrentPartialResponseSize(0);
+ int maxResponseSize = getOriginalScan().getMaxResponseSize();
+ do {
+ moreRows = nextInternal(tmpList, limit, metric);
+ if (!tmpList.isEmpty()) {
+ currentNbRows++;
+ if (outResults != null) {
+ outResults.add(new Result(tmpList));
+ tmpList.clear();
+ }
+ }
+ resetFilters();
+ if (isFilterDone()) {
+ break;
+ }
+
+ // While Condition
+ // 1. respect maxResponseSize and nbRows whichever comes first,
+ // 2. recheck the currentPartialResponseSize is to catch the case
+ // where maxResponseSize is saturated and partialRow == false
+ // since we allow this case valid in the nextInternal() layer
+ } while (moreRows
+ && (getOriginalScan().getCurrentPartialResponseSize() <
+ maxResponseSize && currentNbRows < nbRows));
+ readRequests.incrTotalRequstCount(currentNbRows);
+ scanResult = new ScanResult(moreRows,
+ outResults.toArray(new Result[0]));
+ } catch (IOException e) {
+ // we should queue the exception as the result so that we can return
+ // this when the result is asked for
+ scanResult = new ScanResult(e);
+ }
+ return scanResult;
+ }
+ }
+
+ /**
* A method to return all the rows that can fit in the response size.
* it respects the two stop conditions:
* 1) scan.getMaxResponseSize
@@ -3115,41 +3206,42 @@ public class HRegion implements HeapSize
*
* This is used by Scans.
*/
- public synchronized void nextRows(List<Result> outResults, int nbRows,
- String metric) throws IOException {
- preCondition();
- List<KeyValue> tmpList = new ArrayList<KeyValue>();
+ public synchronized Result[] nextRows(int nbRows, String metric)
+ throws IOException {
+ preCondition();
+ boolean prefetchingEnabled = getOriginalScan().getServerPrefetching();
int limit = this.getOriginalScan().getBatch();
- int currentNbRows = 0;
- boolean moreRows = true;
- // This is necessary b/c partialResponseSize is not serialized through RPC
- getOriginalScan().setCurrentPartialResponseSize(0);
- int maxResponseSize = getOriginalScan().getMaxResponseSize();
- do {
- moreRows = nextInternal(tmpList, limit, metric);
- if (!tmpList.isEmpty()) {
- currentNbRows++;
- if (outResults != null) {
- outResults.add(new Result(tmpList));
- tmpList.clear();
- }
+ ScanResult scanResult;
+ // if we have a prefetched result, then use it
+ if (prefetchingEnabled && prefetchScanFuture != null) {
+ try {
+ scanResult = prefetchScanFuture.get();
+ prefetchScanFuture = null;
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
}
- resetFilters();
- if (isFilterDone()) {
- readRequests.incrTotalRequstCount(currentNbRows);
- return;
+ if (scanResult.isException) {
+ throw scanResult.ioException;
}
-
- // While Condition
- // 1. respect maxResponseSize and nbRows whichever comes first,
- // 2. recheck the currentPartialResponseSize is to catch the case
- // where maxResponseSize is saturated and partialRow == false
- // since we allow this case valid in the nextInternal() layer
- } while (moreRows &&
- (getOriginalScan().getCurrentPartialResponseSize() < maxResponseSize
- && currentNbRows < nbRows));
-
- readRequests.incrTotalRequstCount(currentNbRows);
+ }
+ // if there are no prefetched results, then preform the scan inline
+ else {
+ ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric);
+ scanResult = scanFetch.call();
+ }
+
+ // schedule a background prefetch for the next result if prefetch is
+ // enabled on scans
+ boolean scanDone =
+ (scanResult.outResults == null || scanResult.outResults.length == 0);
+ if (prefetchingEnabled && !scanDone) {
+ ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric);
+ prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable);
+ }
+
+ return scanResult.outResults;
}
/**
@@ -3196,7 +3288,7 @@ public class HRegion implements HeapSize
/*
* @return True if a filter rules the scanner is over, done.
*/
- synchronized boolean isFilterDone() {
+ private boolean isFilterDone() {
return this.filter != null && this.filter.filterAllRemaining();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Dec 3 23:26:11 2012
@@ -54,6 +54,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -271,6 +272,11 @@ public class HRegionServer implements HR
/* Check for major compactions.
*/
Chore majorCompactionChecker;
+ /*
+ * Threadpool for doing scanner prefetches
+ */
+ public static ThreadPoolExecutor scanPrefetchThreadPool;
+
// An array of HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
@@ -814,6 +820,9 @@ public class HRegionServer implements HR
hlogRollers[i].interruptIfNecessary();
}
this.majorCompactionChecker.interrupt();
+
+ // shutdown the prefetch threads
+ scanPrefetchThreadPool.shutdownNow();
if (killed) {
// Just skip out w/o closing regions.
@@ -1546,6 +1555,12 @@ public class HRegionServer implements HR
this.splitLogWorkers.add(splitLogWorker);
splitLogWorker.start();
}
+ // start the scanner prefetch threadpool
+ int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
+ scanPrefetchThreadPool =
+ Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS,
+ new DaemonThreadFactory("scan-prefetch-"));
+
LOG.info("HRegionServer started at: " +
this.serverInfo.getServerAddress().toString());
}
@@ -2616,14 +2631,11 @@ public class HRegionServer implements HR
throw e;
}
this.leases.renewLease(scannerName);
- List<Result> results = new ArrayList<Result>();
- s.nextRows(results, nbRows, HRegion.METRIC_NEXTSIZE);
- numReads.addAndGet(results.size());
- // IF its filter if any is done with the scan
- // and wants to tell the client to stop the scan. This is done by passing
- // a null result.
- return s.isFilterDone() && results.isEmpty()?
- null: results.toArray(new Result[0]);
+ Result[] results = s.nextRows(nbRows, HRegion.METRIC_NEXTSIZE);
+ if (results != null) {
+ numReads.addAndGet(results.length);
+ }
+ return results;
} catch (Throwable t) {
if (t instanceof NotServingRegionException) {
String scannerName = String.valueOf(scannerId);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Mon Dec 3 23:26:11 2012
@@ -22,6 +22,9 @@ package org.apache.hadoop.hbase.util;
import java.io.PrintWriter;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -204,6 +207,41 @@ public class Threads {
return boundedCachedThreadPool;
}
+ /**
+ * Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be
+ * submitted to it, determined by the blockingLimit parameter. Excess tasks
+ * submitted will block on the calling thread till space frees up.
+ *
+ * @param blockingLimit max number of tasks that can be submitted
+ * @param timeout time value after which unused threads are killed
+ * @param unit time unit for killing unused threads
+ * @param threadFactory thread factory to use to spawn threads
+ * @return the ThreadPoolExecutor
+ */
+ public static ThreadPoolExecutor getBlockingThreadPool(
+ int blockingLimit, long timeout, TimeUnit unit,
+ ThreadFactory threadFactory) {
+ ThreadPoolExecutor blockingThreadPool =
+ new ThreadPoolExecutor(
+ 1, blockingLimit, timeout, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ threadFactory,
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ // The submitting thread will block until the thread pool frees up.
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(
+ "Failed to requeue the rejected request because of ", e);
+ }
+ }
+ });
+ blockingThreadPool.allowCoreThreadTimeOut(true);
+ return blockingThreadPool;
+ }
+
public static void renameThread(Thread t, String newName) {
String oldName = t.getName();
if (!t.equals(newName)) {
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java Mon Dec 3 23:26:11 2012
@@ -219,11 +219,16 @@ public abstract class Benchmark {
// bulk load some data into the tables
long numKVsInRegion = Math.round(numKVs * 1.0 / numRegions);
for (HRegionInfo hRegionInfo : regionsToRS.keySet()) {
- // skip the first region which has an empty start key
- if ("".equals(new String(hRegionInfo.getStartKey()))) {
+ // skip the first region which has an empty start key in case of
+ // multiple regions
+ if (numRegions > 1 &&
+ "".equals(new String(hRegionInfo.getStartKey()))) {
continue;
}
- long startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+ long startKey = 0;
+ try {
+ startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+ } catch (NumberFormatException e) { }
long rowID = startKey;
for (; rowID < startKey + numKVsInRegion; rowID++) {
byte[] row = getRowKeyFromLong(rowID);
@@ -247,7 +252,8 @@ public abstract class Benchmark {
long numKVsInRegion = Math.round(numKVs * 1.0 / numRegions);
for (HRegionInfo hRegionInfo : regionsToRS.keySet()) {
// skip the first region which has an empty start key
- if ("".equals(new String(hRegionInfo.getStartKey()))) {
+ if (numRegions > 1 &&
+ "".equals(new String(hRegionInfo.getStartKey()))) {
continue;
}
// get the region server
@@ -301,7 +307,10 @@ public abstract class Benchmark {
byte [] value = new byte[kvSize];
(new Random()).nextBytes(value);
- long startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+ long startKey = 0;
+ try {
+ startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+ } catch (NumberFormatException e) { }
long rowID = startKey;
for (; rowID < startKey + numKVsInRegion; rowID++) {
byte[] row = getRowKeyFromLong(rowID);
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java Mon Dec 3 23:26:11 2012
@@ -31,7 +31,7 @@ public class ScanBenchmark extends Bench
13000, 14000, 15000, 16000,
17000, 18000, 19000, 20000
};
- private static Integer[] SET_PREFETCH_VALUES = { 0 };
+ private static Integer[] SET_PREFETCH_VALUES = { 0, 1 };
public void initBenchmarkResults() {
List<String> header = new ArrayList<String>();
@@ -49,7 +49,7 @@ public class ScanBenchmark extends Bench
// warm block cache, force jit compilation
System.out.println("Warming blockcache and forcing JIT compilation...");
for (int i = 0; i < 20; i++) {
- runExperiment(false, 10000, 0);
+ runExperiment(false, 10000, 0);
}
for (int caching : SET_CACHING_VALUES) {
for (int prefetch : SET_PREFETCH_VALUES) {
@@ -71,6 +71,10 @@ public class ScanBenchmark extends Bench
scan.setMaxVersions(1);
// set caching
scan.setCaching(caching);
+ // set prefetch if needed
+ if (prefetch > 0) {
+ scan.setServerPrefetching(true);
+ }
long numKVs = 0;
long numBytes = 0;
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Mon Dec 3 23:26:11 2012
@@ -1541,8 +1541,7 @@ public class TestHRegion extends HBaseTe
}
RegionScanner rs = (RegionScanner) region.getScanner(scan);
List<KeyValue> kvListScan = new ArrayList<KeyValue>();
- List<Result> results = new ArrayList<Result>();
- rs.nextRows(results, nbRows, null);
+ Result[] results = rs.nextRows(nbRows, null);
for (Result res : results) {
for (KeyValue kv : res.list()) {
kvListScan.add(kv);