You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/12/04 00:26:13 UTC

svn commit: r1416727 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/benchmarks/ test/java/org/apache/...

Author: liyin
Date: Mon Dec  3 23:26:11 2012
New Revision: 1416727

URL: http://svn.apache.org/viewvc?rev=1416727&view=rev
Log:
[HBASE-6874] Implement prefetching for scanners

Author: kranganathan

Summary: Every time a scan makes a next call, the previously fetched value (if any) is returned and the subsequent fetch is issued in a background thread. This helps increase the scan throughput.

Test Plan: Tested on a single server setup, seems to increase throughput from 26MB/s to 39MB/s (see https://our.intern.facebook.com/intern/wiki/index.php/HBase/PerfExperiments/HBase-Scans)

Reviewers: kannan, aaiyer, liyintang

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D583120

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Mon Dec  3 23:26:11 2012
@@ -88,7 +88,8 @@ public class Scan extends Operation impl
   private static final byte STORE_OFFSET_VERSION = (byte)3;
   private static final byte RESPONSE_SIZE_VERSION = (byte)4;
   private static final byte FLASHBACK_VERSION = (byte) 5;
-  private static final byte SCAN_VERSION = FLASHBACK_VERSION;
+  private static final byte PREFETCH_VERSION = (byte) 6;
+  private static final byte SCAN_VERSION = PREFETCH_VERSION;
 
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
@@ -97,6 +98,7 @@ public class Scan extends Operation impl
   private int storeLimit = -1;
   private int storeOffset = 0;
   private int caching = -1;
+  private boolean serverPrefetching = false;
   private int maxResponseSize = HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE;
   private int currentPartialResponseSize = 0;
   private boolean partialRow = false;
@@ -152,6 +154,9 @@ public class Scan extends Operation impl
     storeLimit = scan.getMaxResultsPerColumnFamily();
     storeOffset = scan.getRowOffsetPerColumnFamily();
     caching = scan.getCaching();
+    serverPrefetching = scan.getServerPrefetching();
+    maxResponseSize = scan.getMaxResponseSize();
+    partialRow = scan.isPartialRow();
     cacheBlocks = scan.getCacheBlocks();
     filter = scan.getFilter(); // clone?
     TimeRange ctr = scan.getTimeRange();
@@ -168,6 +173,7 @@ public class Scan extends Operation impl
         addFamily(fam);
       }
     }
+    effectiveTS = scan.getEffectiveTS();
   }
 
   /**
@@ -371,6 +377,20 @@ public class Scan extends Operation impl
   }
 
   /**
+   * Set if pre-fetching is enabled on the region server. If enabled, the 
+   * region server will try to read the next scan result ahead of time. This 
+   * improves scan performance if we are doing large scans.
+   * @param enablePrefetching if pre-fetching is enabled or not
+   */
+  public void setServerPrefetching(boolean enablePrefetching) {
+    this.serverPrefetching = enablePrefetching;
+  }
+
+  public boolean getServerPrefetching() {
+    return serverPrefetching;
+  }
+  
+  /**
    * @return maximum response size that client can handle for a single call to next()
    */
   public int getMaxResponseSize() {
@@ -675,6 +695,9 @@ public class Scan extends Operation impl
     if (version >= FLASHBACK_VERSION) {
       effectiveTS = in.readLong();
     }
+    if (version >= PREFETCH_VERSION) {
+      serverPrefetching = in.readBoolean();
+    }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -703,7 +726,9 @@ public class Scan extends Operation impl
     // We try to talk a protocol version as low as possible so that we can be
     // backward compatible as far as possible.
     byte version = (byte) 1;
-    if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
+    if (serverPrefetching) {
+      version = PREFETCH_VERSION;
+    } else if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
       version = FLASHBACK_VERSION;
     } else if (this.maxResponseSize
         != HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) {
@@ -732,6 +757,9 @@ public class Scan extends Operation impl
     if (version >= FLASHBACK_VERSION) {
       out.writeLong(effectiveTS);
     }
+    if (version >= PREFETCH_VERSION) {
+      out.writeBoolean(serverPrefetching);
+    }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Dec  3 23:26:11 2012
@@ -91,12 +91,15 @@ import org.apache.hadoop.hbase.ipc.HRegi
 import org.apache.hadoop.hbase.metrics.RequestMetrics;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanPrefetcher;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanResult;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -526,7 +529,7 @@ public class HRegion implements HeapSize
     this.waitOnMemstoreBlock =
         conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
-
+    
     this.readRequests =new RequestMetrics();
     this.writeRequests =new RequestMetrics();
   }
@@ -3028,6 +3031,7 @@ public class HRegion implements HeapSize
     private boolean filterClosed = false;
     private long readPt;
     private Scan originalScan;
+    private Future<ScanResult> prefetchScanFuture = null;
 
     RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
       //DebugPrint.println("HRegionScanner.<init>");
@@ -3102,6 +3106,93 @@ public class HRegion implements HeapSize
     }
 
     /**
+     * This class abstracts the results of a single scanner's result. It tracks
+     * the list of Result objects if the pre-fetch next was successful, and
+     * tracks the exception if the next failed.
+     */
+    class ScanResult {
+      final boolean isException;
+      IOException ioException = null;
+      Result[] outResults;
+      boolean moreRows;
+
+      public ScanResult(IOException ioException) {
+        isException = true;
+        this.ioException = ioException;
+      }
+
+      public ScanResult(boolean moreRows, Result[] outResults) {
+        isException = false;
+        this.moreRows = moreRows;
+        this.outResults = outResults;
+      }
+    }
+
+    /**
+     * This Callable abstracts calling a pre-fetch next. This is called on a
+     * threadpool. It makes a pre-fetch next call with the same parameters as
+     * the incoming next call. Note that the number of rows to return (nbRows)
+     * and/or the memory size for the result is the same as the previous call if
+     * pre-fetching is enabled. If these params change dynamically, they will
+     * take effect in the subsequent iteration.
+     */
+    class ScanPrefetcher implements Callable<ScanResult> {
+      int nbRows;
+      int limit;
+      String metric;
+
+      ScanPrefetcher(int nbRows, int limit, String metric) {
+        this.nbRows = nbRows;
+        this.limit = limit;
+        this.metric = metric;
+      }
+
+      public ScanResult call() {
+        ScanResult scanResult = null;
+        List<Result> outResults = new ArrayList<Result>();
+        List<KeyValue> tmpList = new ArrayList<KeyValue>();
+        int currentNbRows = 0;
+        boolean moreRows = true;
+        try {
+          // This is necessary b/c partialResponseSize is not serialized through
+          // RPC
+          getOriginalScan().setCurrentPartialResponseSize(0);
+          int maxResponseSize = getOriginalScan().getMaxResponseSize();
+          do {
+            moreRows = nextInternal(tmpList, limit, metric);
+            if (!tmpList.isEmpty()) {
+              currentNbRows++;
+              if (outResults != null) {
+                outResults.add(new Result(tmpList));
+                tmpList.clear();
+              }
+            }
+            resetFilters();
+            if (isFilterDone()) {
+              break;
+            }
+
+            // While Condition
+            // 1. respect maxResponseSize and nbRows whichever comes first,
+            // 2. recheck the currentPartialResponseSize is to catch the case
+            // where maxResponseSize is saturated and partialRow == false
+            // since we allow this case valid in the nextInternal() layer
+          } while (moreRows
+              && (getOriginalScan().getCurrentPartialResponseSize() < 
+                  maxResponseSize && currentNbRows < nbRows));
+          readRequests.incrTotalRequstCount(currentNbRows);
+          scanResult = new ScanResult(moreRows, 
+              outResults.toArray(new Result[0]));
+        } catch (IOException e) {
+          // we should queue the exception as the result so that we can return
+          // this when the result is asked for
+          scanResult = new ScanResult(e);
+        }
+        return scanResult;
+      }
+    }
+
+    /**
      * A method to return all the rows that can fit in the response size.
      * it respects the two stop conditions:
      * 1) scan.getMaxResponseSize
@@ -3115,41 +3206,42 @@ public class HRegion implements HeapSize
      *
      * This is used by Scans.
      */
-    public synchronized void nextRows(List<Result> outResults, int nbRows, 
-        String metric) throws IOException {
-      preCondition(); 
-      List<KeyValue> tmpList = new ArrayList<KeyValue>();
+    public synchronized Result[] nextRows(int nbRows, String metric) 
+    throws IOException {
+      preCondition();
+      boolean prefetchingEnabled = getOriginalScan().getServerPrefetching();
       int limit = this.getOriginalScan().getBatch();
-      int currentNbRows = 0;
-      boolean moreRows = true;
-      // This is necessary b/c partialResponseSize is not serialized through RPC    
-      getOriginalScan().setCurrentPartialResponseSize(0);
-      int maxResponseSize = getOriginalScan().getMaxResponseSize();
-      do {
-        moreRows = nextInternal(tmpList, limit, metric);
-        if (!tmpList.isEmpty()) {
-          currentNbRows++;
-          if (outResults != null) {
-            outResults.add(new Result(tmpList));
-            tmpList.clear();
-          }
+      ScanResult scanResult;
+      // if we have a prefetched result, then use it
+      if (prefetchingEnabled && prefetchScanFuture != null) {
+        try {
+          scanResult = prefetchScanFuture.get();
+          prefetchScanFuture = null;
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e);
         }
-        resetFilters();
-        if (isFilterDone()) {
-          readRequests.incrTotalRequstCount(currentNbRows);
-          return;
+        if (scanResult.isException) {
+          throw scanResult.ioException;
         }
-         
-        // While Condition
-        // 1. respect maxResponseSize and nbRows whichever comes first,
-        // 2. recheck the currentPartialResponseSize is to catch the case
-        //   where maxResponseSize is saturated and partialRow == false 
-        //   since we allow this case valid in the nextInternal() layer
-      } while (moreRows && 
-          (getOriginalScan().getCurrentPartialResponseSize() < maxResponseSize 
-           && currentNbRows < nbRows));
-       
-      readRequests.incrTotalRequstCount(currentNbRows);
+      }
+      // if there are no prefetched results, then preform the scan inline
+      else {
+        ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric);
+        scanResult = scanFetch.call();
+      }
+
+      // schedule a background prefetch for the next result if prefetch is
+      // enabled on scans
+      boolean scanDone = 
+        (scanResult.outResults == null || scanResult.outResults.length == 0);
+      if (prefetchingEnabled && !scanDone) {
+        ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric);
+        prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable);
+      }
+      
+      return scanResult.outResults;
     }
     
     /**
@@ -3196,7 +3288,7 @@ public class HRegion implements HeapSize
     /*
      * @return True if a filter rules the scanner is over, done.
      */
-    synchronized boolean isFilterDone() {
+    private boolean isFilterDone() {
       return this.filter != null && this.filter.filterAllRemaining();
     }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Dec  3 23:26:11 2012
@@ -54,6 +54,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -271,6 +272,11 @@ public class HRegionServer implements HR
   /* Check for major compactions.
    */
   Chore majorCompactionChecker;
+  /*
+   * Threadpool for doing scanner prefetches
+   */
+  public static ThreadPoolExecutor scanPrefetchThreadPool;
+
 
   // An array of HLog and HLog roller.  log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
@@ -814,6 +820,9 @@ public class HRegionServer implements HR
       hlogRollers[i].interruptIfNecessary();  
     }
     this.majorCompactionChecker.interrupt();
+    
+    // shutdown the prefetch threads
+    scanPrefetchThreadPool.shutdownNow();
 
     if (killed) {
       // Just skip out w/o closing regions.
@@ -1546,6 +1555,12 @@ public class HRegionServer implements HR
       this.splitLogWorkers.add(splitLogWorker);
       splitLogWorker.start();
     }
+    // start the scanner prefetch threadpool
+    int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
+    scanPrefetchThreadPool = 
+      Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS, 
+          new DaemonThreadFactory("scan-prefetch-"));
+
     LOG.info("HRegionServer started at: " +
       this.serverInfo.getServerAddress().toString());
   }
@@ -2616,14 +2631,11 @@ public class HRegionServer implements HR
         throw e;
       }
       this.leases.renewLease(scannerName);
-      List<Result> results = new ArrayList<Result>();
-      s.nextRows(results, nbRows, HRegion.METRIC_NEXTSIZE); 
-      numReads.addAndGet(results.size());
-      // IF its filter if any is done with the scan
-      // and wants to tell the client to stop the scan.  This is done by passing
-      // a null result.
-      return s.isFilterDone() && results.isEmpty()?
-        null: results.toArray(new Result[0]);
+      Result[] results = s.nextRows(nbRows, HRegion.METRIC_NEXTSIZE);
+      if (results != null) {
+        numReads.addAndGet(results.length);
+      }
+      return results;
     } catch (Throwable t) {
       if (t instanceof NotServingRegionException) {
         String scannerName = String.valueOf(scannerId);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Mon Dec  3 23:26:11 2012
@@ -22,6 +22,9 @@ package org.apache.hadoop.hbase.util;
 import java.io.PrintWriter;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -204,6 +207,41 @@ public class Threads {
     return boundedCachedThreadPool;
   }
 
+  /**
+   * Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be 
+   * submitted to it, determined by the blockingLimit parameter. Excess tasks 
+   * submitted will block on the calling thread till space frees up.
+   * 
+   * @param blockingLimit max number of tasks that can be submitted
+   * @param timeout time value after which unused threads are killed
+   * @param unit time unit for killing unused threads
+   * @param threadFactory thread factory to use to spawn threads
+   * @return the ThreadPoolExecutor
+   */
+  public static ThreadPoolExecutor getBlockingThreadPool(
+      int blockingLimit, long timeout, TimeUnit unit,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor blockingThreadPool = 
+      new ThreadPoolExecutor( 
+        1, blockingLimit, timeout, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        threadFactory, 
+        new RejectedExecutionHandler() {  
+          @Override 
+          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
+            try { 
+              // The submitting thread will block until the thread pool frees up. 
+              executor.getQueue().put(r); 
+            } catch (InterruptedException e) {  
+              throw new RejectedExecutionException( 
+                  "Failed to requeue the rejected request because of ", e); 
+            } 
+          } 
+        });
+    blockingThreadPool.allowCoreThreadTimeOut(true);
+    return blockingThreadPool;
+  }
+
   public static void renameThread(Thread t, String newName) {
     String oldName = t.getName();
     if (!t.equals(newName)) {

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java Mon Dec  3 23:26:11 2012
@@ -219,11 +219,16 @@ public abstract class Benchmark {
         // bulk load some data into the tables
         long numKVsInRegion = Math.round(numKVs * 1.0 / numRegions);
         for (HRegionInfo hRegionInfo : regionsToRS.keySet()) {
-          // skip the first region which has an empty start key
-          if ("".equals(new String(hRegionInfo.getStartKey()))) {
+          // skip the first region which has an empty start key in case of 
+          // multiple regions
+          if (numRegions > 1 && 
+              "".equals(new String(hRegionInfo.getStartKey()))) {
             continue;
           }
-          long startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+          long startKey = 0;
+          try {
+            startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+          } catch (NumberFormatException e) { }
           long rowID = startKey;
           for (; rowID < startKey + numKVsInRegion; rowID++) {
             byte[] row = getRowKeyFromLong(rowID);
@@ -247,7 +252,8 @@ public abstract class Benchmark {
       long numKVsInRegion = Math.round(numKVs * 1.0 / numRegions);
       for (HRegionInfo hRegionInfo : regionsToRS.keySet()) {
         // skip the first region which has an empty start key
-        if ("".equals(new String(hRegionInfo.getStartKey()))) {
+        if (numRegions > 1 && 
+            "".equals(new String(hRegionInfo.getStartKey()))) {
           continue;
         }
         // get the region server
@@ -301,7 +307,10 @@ public abstract class Benchmark {
     byte [] value = new byte[kvSize];
     (new Random()).nextBytes(value);
 
-    long startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+    long startKey = 0;
+    try {
+      startKey = getLongFromRowKey(hRegionInfo.getStartKey());
+    } catch (NumberFormatException e) { }
     long rowID = startKey;
     for (; rowID < startKey + numKVsInRegion; rowID++) {
       byte[] row = getRowKeyFromLong(rowID);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanBenchmark.java Mon Dec  3 23:26:11 2012
@@ -31,7 +31,7 @@ public class ScanBenchmark extends Bench
     13000, 14000, 15000, 16000,
     17000, 18000, 19000, 20000
   };
-  private static Integer[] SET_PREFETCH_VALUES = { 0 };
+  private static Integer[] SET_PREFETCH_VALUES = { 0, 1 };
   
   public void initBenchmarkResults() {
     List<String> header = new ArrayList<String>();
@@ -49,7 +49,7 @@ public class ScanBenchmark extends Bench
     // warm block cache, force jit compilation
     System.out.println("Warming blockcache and forcing JIT compilation...");
     for (int i = 0; i < 20; i++) {
-      runExperiment(false, 10000, 0);  
+      runExperiment(false, 10000, 0);
     }
     for (int caching : SET_CACHING_VALUES) {  
       for (int prefetch : SET_PREFETCH_VALUES) {
@@ -71,6 +71,10 @@ public class ScanBenchmark extends Bench
     scan.setMaxVersions(1);
     // set caching
     scan.setCaching(caching);
+    // set prefetch if needed
+    if (prefetch > 0) {
+      scan.setServerPrefetching(true);
+    }
     
     long numKVs = 0;
     long numBytes = 0;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1416727&r1=1416726&r2=1416727&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Mon Dec  3 23:26:11 2012
@@ -1541,8 +1541,7 @@ public class TestHRegion extends HBaseTe
     }
     RegionScanner rs = (RegionScanner) region.getScanner(scan);
     List<KeyValue> kvListScan = new ArrayList<KeyValue>();
-    List<Result> results = new ArrayList<Result>();
-    rs.nextRows(results, nbRows, null);
+    Result[] results = rs.nextRows(nbRows, null);
     for (Result res : results) {
       for (KeyValue kv : res.list()) {
         kvListScan.add(kv);