You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/05/25 00:51:23 UTC

svn commit: r1486246 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-protocol/src/main/java/org/ap...

Author: jxiang
Date: Fri May 24 22:51:22 2013
New Revision: 1486246

URL: http://svn.apache.org/r1486246
Log:
HBASE-8420 Port HBASE-6874 Implement prefetching for scanners from 0.89-fb

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java   (with props)
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Fri May 24 22:51:22 2013
@@ -128,7 +128,7 @@ public class ClientScanner extends Abstr
       }
 
       // initialize the scanner
-      nextScanner(this.caching, false);
+      nextScanner(false);
     }
 
     protected HConnection getConnection() {
@@ -169,10 +169,9 @@ public class ClientScanner extends Abstr
      * scanner at the scan.getStartRow().  We will go no further, just tidy
      * up outstanding scanners, if <code>currentRegion != null</code> and
      * <code>done</code> is true.
-     * @param nbRows
      * @param done Server-side says we're done scanning.
      */
-    private boolean nextScanner(int nbRows, final boolean done)
+    private boolean nextScanner(final boolean done)
     throws IOException {
       // Close the previous scanner if it's open
       if (this.callable != null) {
@@ -210,7 +209,7 @@ public class ClientScanner extends Abstr
           Bytes.toStringBinary(localStartKey) + "'");
       }
       try {
-        callable = getScannerCallable(localStartKey, nbRows);
+        callable = getScannerCallable(localStartKey);
         // Open a scanner on the region server starting at the
         // beginning of the region
         callable.withRetries();
@@ -225,12 +224,11 @@ public class ClientScanner extends Abstr
       return true;
     }
 
-    protected ScannerCallable getScannerCallable(byte [] localStartKey,
-        int nbRows) {
+    protected ScannerCallable getScannerCallable(byte [] localStartKey) {
       scan.setStartRow(localStartKey);
       ScannerCallable s = new ScannerCallable(getConnection(),
         getTableName(), scan, this.scanMetrics);
-      s.setCaching(nbRows);
+      s.setCaching(this.caching);
       return s;
     }
 
@@ -262,27 +260,21 @@ public class ClientScanner extends Abstr
         Result [] values = null;
         long remainingResultSize = maxScannerResultSize;
         int countdown = this.caching;
-        // We need to reset it if it's a new callable that was created
-        // with a countdown in nextScanner
-        callable.setCaching(this.caching);
+
         // This flag is set when we want to skip the result returned.  We do
         // this when we reset scanner because it split under us.
         boolean skipFirst = false;
         boolean retryAfterOutOfOrderException  = true;
         do {
           try {
-            if (skipFirst) {
-              // Skip only the first row (which was the last row of the last
-              // already-processed batch).
-              callable.setCaching(1);
-              values = callable.withRetries();
-              callable.setCaching(this.caching);
-              skipFirst = false;
-            }
             // Server returns a null values if scanning is to stop.  Else,
             // returns an empty array if scanning is to go on and we've just
             // exhausted current region.
             values = callable.withRetries();
+            if (skipFirst && values != null && values.length == 1) {
+              skipFirst = false; // Already skipped, unset it before scanning again
+              values = callable.withRetries();
+            }
             retryAfterOutOfOrderException  = true;
           } catch (DoNotRetryIOException e) {
             // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
@@ -344,7 +336,15 @@ public class ClientScanner extends Abstr
           }
           lastNext = currentTime;
           if (values != null && values.length > 0) {
-            for (Result rs : values) {
+            int i = 0;
+            if (skipFirst) {
+              skipFirst = false;
+              // We will cache one row less, which is fine
+              countdown--;
+              i = 1;
+            }
+            for (; i < values.length; i++) {
+              Result rs = values[i];
               cache.add(rs);
               for (KeyValue kv : rs.raw()) {
                   remainingResultSize -= kv.heapSize();
@@ -354,7 +354,7 @@ public class ClientScanner extends Abstr
             }
           }
           // Values == null means server-side filter has determined we must STOP
-        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
+        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null));
       }
 
       if (cache.size() > 0) {
@@ -411,4 +411,12 @@ public class ClientScanner extends Abstr
       }
       closed = true;
     }
+
+    long currentScannerId() {
+      return (callable == null) ? -1L : callable.scannerId;
+    }
+
+    HRegionInfo currentRegionInfo() {
+      return currentRegion;
+    }
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java Fri May 24 22:51:22 2013
@@ -116,6 +116,8 @@ public class Scan extends OperationWithA
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
   private Boolean loadColumnFamiliesOnDemand = null;
 
+  private boolean prefetching = true;
+
   /**
    * Create a Scan operation across all rows.
    */
@@ -168,6 +170,7 @@ public class Scan extends OperationWithA
     getScan = scan.isGetScan();
     filter = scan.getFilter(); // clone?
     loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
+    prefetching = scan.getPrefetching();
     TimeRange ctr = scan.getTimeRange();
     tr = new TimeRange(ctr.getMin(), ctr.getMax());
     Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
@@ -201,6 +204,7 @@ public class Scan extends OperationWithA
     this.storeOffset = get.getRowOffsetPerColumnFamily();
     this.tr = get.getTimeRange();
     this.familyMap = get.getFamilyMap();
+    this.prefetching = false;
     this.getScan = true;
   }
 
@@ -364,6 +368,21 @@ public class Scan extends OperationWithA
   }
 
   /**
+   * Set if pre-fetching is enabled. If enabled, the region
+   * server will try to read the next scan result ahead of time. This
+   * improves scan performance if we are doing large scans.
+   *
+   * @param enablePrefetching if pre-fetching is enabled or not
+   */
+  public void setPrefetching(boolean enablePrefetching) {
+    this.prefetching = enablePrefetching;
+  }
+
+  public boolean getPrefetching() {
+    return prefetching;
+  }
+
+/**
    * @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
    */
   public long getMaxResultSize() {
@@ -613,6 +632,7 @@ public class Scan extends OperationWithA
     map.put("maxResultSize", this.maxResultSize);
     map.put("cacheBlocks", this.cacheBlocks);
     map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
+    map.put("prefetching", this.prefetching);
     List<Long> timeRange = new ArrayList<Long>();
     timeRange.add(this.tr.getMin());
     timeRange.add(this.tr.getMax());

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Fri May 24 22:51:22 2013
@@ -60,7 +60,7 @@ public class ScannerCallable extends Ser
   public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
 
   public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
-  private long scannerId = -1L;
+  long scannerId = -1L;
   private boolean instantiated = false;
   private boolean closed = false;
   private Scan scan;
@@ -130,6 +130,7 @@ public class ScannerCallable extends Ser
   /**
    * @see java.util.concurrent.Callable#call()
    */
+  @SuppressWarnings("deprecation")
   public Result [] call() throws IOException {
     if (closed) {
       if (scannerId != -1) {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Fri May 24 22:51:22 2013
@@ -709,6 +709,9 @@ public final class ProtobufUtil {
     if (scan.getBatch() > 0) {
       scanBuilder.setBatchSize(scan.getBatch());
     }
+    if (scan.getCaching() > 0) {
+      scanBuilder.setCachingCount(scan.getCaching());
+    }
     if (scan.getMaxResultSize() > 0) {
       scanBuilder.setMaxResultSize(scan.getMaxResultSize());
     }
@@ -716,6 +719,7 @@ public final class ProtobufUtil {
     if (loadColumnFamiliesOnDemand != null) {
       scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
     }
+    scanBuilder.setPrefetching(scan.getPrefetching());
     scanBuilder.setMaxVersions(scan.getMaxVersions());
     TimeRange timeRange = scan.getTimeRange();
     if (!timeRange.isAllTime()) {
@@ -793,6 +797,9 @@ public final class ProtobufUtil {
     if (proto.hasMaxVersions()) {
       scan.setMaxVersions(proto.getMaxVersions());
     }
+    if (proto.hasPrefetching()) {
+      scan.setPrefetching(proto.getPrefetching());
+    }
     if (proto.hasStoreLimit()) {
       scan.setMaxResultsPerColumnFamily(proto.getStoreLimit());
     }
@@ -821,6 +828,9 @@ public final class ProtobufUtil {
     if (proto.hasBatchSize()) {
       scan.setBatch(proto.getBatchSize());
     }
+    if (proto.hasCachingCount()) {
+      scan.setCaching(proto.getCachingCount());
+    }
     if (proto.hasMaxResultSize()) {
       scan.setMaxResultSize(proto.getMaxResultSize());
     }

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java Fri May 24 22:51:22 2013
@@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.util;
 import java.io.PrintWriter;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -180,8 +183,42 @@ public class Threads {
     boundedCachedThreadPool.allowCoreThreadTimeOut(true);
     return boundedCachedThreadPool;
   }
-  
-  
+
+  /**
+   * Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be
+   * submitted to it, determined by the blockingLimit parameter. Excess tasks
+   * submitted will block on the calling thread till space frees up.
+   *
+   * @param blockingLimit max number of tasks that can be submitted
+   * @param timeout time value after which unused threads are killed
+   * @param unit time unit for killing unused threads
+   * @param threadFactory thread factory to use to spawn threads
+   * @return the ThreadPoolExecutor
+   */
+  public static ThreadPoolExecutor getBlockingThreadPool(
+      int blockingLimit, long timeout, TimeUnit unit,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor blockingThreadPool =
+      new ThreadPoolExecutor(
+        1, blockingLimit, timeout, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        threadFactory,
+        new RejectedExecutionHandler() {
+          @Override
+          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+            try {
+              // The submitting thread will block until the thread pool frees up.
+              executor.getQueue().put(r);
+            } catch (InterruptedException e) {
+              throw new RejectedExecutionException(
+                  "Failed to requeue the rejected request because of ", e);
+            }
+          }
+        });
+    blockingThreadPool.allowCoreThreadTimeOut(true);
+    return blockingThreadPool;
+  }
+
   /**
    * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
    * with a common prefix.

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java Fri May 24 22:51:22 2013
@@ -10622,6 +10622,14 @@ public final class ClientProtos {
     // optional bool loadColumnFamiliesOnDemand = 13;
     boolean hasLoadColumnFamiliesOnDemand();
     boolean getLoadColumnFamiliesOnDemand();
+    
+    // optional uint32 cachingCount = 14;
+    boolean hasCachingCount();
+    int getCachingCount();
+    
+    // optional bool prefetching = 15;
+    boolean hasPrefetching();
+    boolean getPrefetching();
   }
   public static final class Scan extends
       com.google.protobuf.GeneratedMessage
@@ -10810,6 +10818,26 @@ public final class ClientProtos {
       return loadColumnFamiliesOnDemand_;
     }
     
+    // optional uint32 cachingCount = 14;
+    public static final int CACHINGCOUNT_FIELD_NUMBER = 14;
+    private int cachingCount_;
+    public boolean hasCachingCount() {
+      return ((bitField0_ & 0x00000800) == 0x00000800);
+    }
+    public int getCachingCount() {
+      return cachingCount_;
+    }
+    
+    // optional bool prefetching = 15;
+    public static final int PREFETCHING_FIELD_NUMBER = 15;
+    private boolean prefetching_;
+    public boolean hasPrefetching() {
+      return ((bitField0_ & 0x00001000) == 0x00001000);
+    }
+    public boolean getPrefetching() {
+      return prefetching_;
+    }
+    
     private void initFields() {
       column_ = java.util.Collections.emptyList();
       attribute_ = java.util.Collections.emptyList();
@@ -10824,6 +10852,8 @@ public final class ClientProtos {
       storeLimit_ = 0;
       storeOffset_ = 0;
       loadColumnFamiliesOnDemand_ = false;
+      cachingCount_ = 0;
+      prefetching_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -10894,6 +10924,12 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000400) == 0x00000400)) {
         output.writeBool(13, loadColumnFamiliesOnDemand_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        output.writeUInt32(14, cachingCount_);
+      }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        output.writeBool(15, prefetching_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -10955,6 +10991,14 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(13, loadColumnFamiliesOnDemand_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(14, cachingCount_);
+      }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(15, prefetching_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -11037,6 +11081,16 @@ public final class ClientProtos {
         result = result && (getLoadColumnFamiliesOnDemand()
             == other.getLoadColumnFamiliesOnDemand());
       }
+      result = result && (hasCachingCount() == other.hasCachingCount());
+      if (hasCachingCount()) {
+        result = result && (getCachingCount()
+            == other.getCachingCount());
+      }
+      result = result && (hasPrefetching() == other.hasPrefetching());
+      if (hasPrefetching()) {
+        result = result && (getPrefetching()
+            == other.getPrefetching());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -11098,6 +11152,14 @@ public final class ClientProtos {
         hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
       }
+      if (hasCachingCount()) {
+        hash = (37 * hash) + CACHINGCOUNT_FIELD_NUMBER;
+        hash = (53 * hash) + getCachingCount();
+      }
+      if (hasPrefetching()) {
+        hash = (37 * hash) + PREFETCHING_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getPrefetching());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -11260,6 +11322,10 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000800);
         loadColumnFamiliesOnDemand_ = false;
         bitField0_ = (bitField0_ & ~0x00001000);
+        cachingCount_ = 0;
+        bitField0_ = (bitField0_ & ~0x00002000);
+        prefetching_ = false;
+        bitField0_ = (bitField0_ & ~0x00004000);
         return this;
       }
       
@@ -11368,6 +11434,14 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000400;
         }
         result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
+        if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+          to_bitField0_ |= 0x00000800;
+        }
+        result.cachingCount_ = cachingCount_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00001000;
+        }
+        result.prefetching_ = prefetching_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -11469,6 +11543,12 @@ public final class ClientProtos {
         if (other.hasLoadColumnFamiliesOnDemand()) {
           setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
         }
+        if (other.hasCachingCount()) {
+          setCachingCount(other.getCachingCount());
+        }
+        if (other.hasPrefetching()) {
+          setPrefetching(other.getPrefetching());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -11593,6 +11673,16 @@ public final class ClientProtos {
               loadColumnFamiliesOnDemand_ = input.readBool();
               break;
             }
+            case 112: {
+              bitField0_ |= 0x00002000;
+              cachingCount_ = input.readUInt32();
+              break;
+            }
+            case 120: {
+              bitField0_ |= 0x00004000;
+              prefetching_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -12346,6 +12436,48 @@ public final class ClientProtos {
         return this;
       }
       
+      // optional uint32 cachingCount = 14;
+      private int cachingCount_ ;
+      public boolean hasCachingCount() {
+        return ((bitField0_ & 0x00002000) == 0x00002000);
+      }
+      public int getCachingCount() {
+        return cachingCount_;
+      }
+      public Builder setCachingCount(int value) {
+        bitField0_ |= 0x00002000;
+        cachingCount_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearCachingCount() {
+        bitField0_ = (bitField0_ & ~0x00002000);
+        cachingCount_ = 0;
+        onChanged();
+        return this;
+      }
+      
+      // optional bool prefetching = 15;
+      private boolean prefetching_ ;
+      public boolean hasPrefetching() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      public boolean getPrefetching() {
+        return prefetching_;
+      }
+      public Builder setPrefetching(boolean value) {
+        bitField0_ |= 0x00004000;
+        prefetching_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearPrefetching() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        prefetching_ = false;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Scan)
     }
     
@@ -21467,7 +21599,7 @@ public final class ClientProtos {
       "ation\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition" +
       "\030\003 \001(\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006" +
       "result\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010" +
-      "\"\307\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" +
+      "\"\362\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" +
       "tribute\030\002 \003(\0132\016.NameBytesPair\022\020\n\010startRo" +
       "w\030\003 \001(\014\022\017\n\007stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\013" +
       "2\007.Filter\022\035\n\ttimeRange\030\006 \001(\0132\n.TimeRange" +
@@ -21475,45 +21607,46 @@ public final class ClientProtos {
       "\010 \001(\010:\004true\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxRes" +
       "ultSize\030\n \001(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013sto" +
       "reOffset\030\014 \001(\r\022\"\n\032loadColumnFamiliesOnDe" +
-      "mand\030\r \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001" +
-      "(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Sca" +
-      "n\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001" +
-      "(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallSeq\030" +
-      "\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMeta\030" +
-      "\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002 \001(" +
-      "\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%\n\016R",
-      "esultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001\n\024B" +
-      "ulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.Re" +
-      "gionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .Bulk" +
-      "LoadHFileRequest.FamilyPath\022\024\n\014assignSeq" +
-      "Num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022" +
-      "\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016" +
-      "\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceCall" +
-      "\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n\nme" +
-      "thodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Copro" +
-      "cessorServiceRequest\022 \n\006region\030\001 \002(\0132\020.R",
-      "egionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coprocess" +
-      "orServiceCall\"]\n\032CoprocessorServiceRespo" +
-      "nse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n" +
-      "\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013MultiAct" +
-      "ion\022 \n\010mutation\030\001 \001(\0132\016.MutationProto\022\021\n" +
-      "\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005valu" +
-      "e\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016.Na" +
-      "meBytesPair\"^\n\014MultiRequest\022 \n\006region\030\001 " +
-      "\002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\0132\014." +
-      "MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResp",
-      "onse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342\002\n\r" +
-      "ClientService\022 \n\003get\022\013.GetRequest\032\014.GetR" +
-      "esponse\022/\n\010multiGet\022\020.MultiGetRequest\032\021." +
-      "MultiGetResponse\022)\n\006mutate\022\016.MutateReque" +
-      "st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" +
-      "t\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.Bulk" +
-      "LoadHFileRequest\032\026.BulkLoadHFileResponse" +
-      "\022F\n\013execService\022\032.CoprocessorServiceRequ" +
-      "est\032\033.CoprocessorServiceResponse\022&\n\005mult" +
-      "i\022\r.MultiRequest\032\016.MultiResponseBB\n*org.",
-      "apache.hadoop.hbase.protobuf.generatedB\014" +
-      "ClientProtosH\001\210\001\001\240\001\001"
+      "mand\030\r \001(\010\022\024\n\014cachingCount\030\016 \001(\r\022\023\n\013pref" +
+      "etching\030\017 \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030" +
+      "\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005." +
+      "Scan\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030" +
+      "\004 \001(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallS" +
+      "eq\030\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMe" +
+      "ta\030\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002",
+      " \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%" +
+      "\n\016ResultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001" +
+      "\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020" +
+      ".RegionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .B" +
+      "ulkLoadHFileRequest.FamilyPath\022\024\n\014assign" +
+      "SeqNum\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" +
+      "(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" +
+      "e\022\016\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceC" +
+      "all\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n" +
+      "\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Co",
+      "processorServiceRequest\022 \n\006region\030\001 \002(\0132" +
+      "\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coproc" +
+      "essorServiceCall\"]\n\032CoprocessorServiceRe" +
+      "sponse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
+      "\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013Multi" +
+      "Action\022 \n\010mutation\030\001 \001(\0132\016.MutationProto" +
+      "\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005v" +
+      "alue\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016" +
+      ".NameBytesPair\"^\n\014MultiRequest\022 \n\006region" +
+      "\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\013",
+      "2\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiR" +
+      "esponse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342" +
+      "\002\n\rClientService\022 \n\003get\022\013.GetRequest\032\014.G" +
+      "etResponse\022/\n\010multiGet\022\020.MultiGetRequest" +
+      "\032\021.MultiGetResponse\022)\n\006mutate\022\016.MutateRe" +
+      "quest\032\017.MutateResponse\022#\n\004scan\022\014.ScanReq" +
+      "uest\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.B" +
+      "ulkLoadHFileRequest\032\026.BulkLoadHFileRespo" +
+      "nse\022F\n\013execService\022\032.CoprocessorServiceR" +
+      "equest\032\033.CoprocessorServiceResponse\022&\n\005m",
+      "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
+      "rg.apache.hadoop.hbase.protobuf.generate" +
+      "dB\014ClientProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21629,7 +21762,7 @@ public final class ClientProtos {
           internal_static_Scan_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Scan_descriptor,
-              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", },
+              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "CachingCount", "Prefetching", },
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
           internal_static_ScanRequest_descriptor =

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto Fri May 24 22:51:22 2013
@@ -233,6 +233,8 @@ message Scan {
   optional uint32 storeLimit = 11;
   optional uint32 storeOffset = 12;
   optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */
+  optional uint32 cachingCount = 14;
+  optional bool prefetching = 15;
 }
 
 /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri May 24 22:51:22 2013
@@ -185,6 +185,7 @@ import com.google.protobuf.Service;
  * defines the keyspace for this HRegion.
  */
 @InterfaceAudience.Private
+@SuppressWarnings("deprecation")
 public class HRegion implements HeapSize { // , Writable{
   public static final Log LOG = LogFactory.getLog(HRegion.class);
 
@@ -3603,7 +3604,6 @@ public class HRegion implements HeapSize
       return returnResult;
     }
 
-
     private void populateFromJoinedHeap(List<KeyValue> results, int limit)
         throws IOException {
       assert joinedContinuationRow != null;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri May 24 22:51:22 2013
@@ -45,6 +45,8 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.ObjectName;
@@ -59,12 +61,12 @@ import org.apache.hadoop.hbase.CellScann
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HealthCheckChore;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
@@ -164,8 +166,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -480,6 +482,11 @@ public class HRegionServer implements Cl
   private TableLockManager tableLockManager;
 
   /**
+   * Threadpool for doing scanner prefetches
+   */
+  protected ThreadPoolExecutor scanPrefetchThreadPool;
+
+  /**
    * Starts a HRegionServer at the default location
    *
    * @param conf
@@ -616,14 +623,18 @@ public class HRegionServer implements Cl
   }
 
   RegionScanner getScanner(long scannerId) {
-    String scannerIdString = Long.toString(scannerId);
-    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
+    RegionScannerHolder scannerHolder = getScannerHolder(scannerId);
     if (scannerHolder != null) {
-      return scannerHolder.s;
+      return scannerHolder.scanner;
     }
     return null;
   }
 
+  public RegionScannerHolder getScannerHolder(long scannerId) {
+    String scannerIdString = Long.toString(scannerId);
+    return scanners.get(scannerIdString);
+  }
+
   /**
    * All initialization needed before we go register with Master.
    *
@@ -837,6 +848,11 @@ public class HRegionServer implements Cl
     if (this.thriftServer != null) this.thriftServer.shutdown();
     this.leases.closeAfterLeasesExpire();
     this.rpcServer.stop();
+
+    if (scanPrefetchThreadPool != null) {
+      // shutdown the prefetch threads
+      scanPrefetchThreadPool.shutdownNow();
+    }
     if (this.splitLogWorker != null) {
       splitLogWorker.stop();
     }
@@ -1107,7 +1123,7 @@ public class HRegionServer implements Cl
     // exception next time they come in.
     for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
       try {
-        e.getValue().s.close();
+        e.getValue().closeScanner();
       } catch (IOException ioe) {
         LOG.warn("Closing scanner " + e.getKey(), ioe);
       }
@@ -1537,6 +1553,14 @@ public class HRegionServer implements Cl
       this.replicationSinkHandler.startReplicationService();
     }
 
+    // start the scanner prefetch threadpool
+    int numHandlers = conf.getInt("hbase.regionserver.prefetcher.threads.max",
+      conf.getInt("hbase.regionserver.handler.count", 10)
+        + conf.getInt("hbase.regionserver.metahandler.count", 10));
+    scanPrefetchThreadPool =
+      Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS,
+        new DaemonThreadFactory(RegionScannerHolder.PREFETCHER_THREAD_PREFIX));
+
     // Start Server.  This service is like leases in that it internally runs
     // a thread.
     this.rpcServer.start();
@@ -1831,8 +1855,7 @@ public class HRegionServer implements Cl
         continue;
       }
 
-      InetSocketAddress isa =
-        new InetSocketAddress(sn.getHostname(), sn.getPort());
+      new InetSocketAddress(sn.getHostname(), sn.getPort());
 
       LOG.info("Attempting connect to Master server at " +
         this.masterAddressManager.getMasterAddress());
@@ -2325,7 +2348,7 @@ public class HRegionServer implements Cl
     public void leaseExpired() {
       RegionScannerHolder rsh = scanners.remove(this.scannerName);
       if (rsh != null) {
-        RegionScanner s = rsh.s;
+        RegionScanner s = rsh.scanner;
         LOG.info("Scanner " + this.scannerName + " lease expired on region "
             + s.getRegionInfo().getRegionNameAsString());
         try {
@@ -2334,7 +2357,7 @@ public class HRegionServer implements Cl
             region.getCoprocessorHost().preScannerClose(s);
           }
 
-          s.close();
+          rsh.closeScanner();
           if (region != null && region.getCoprocessorHost() != null) {
             region.getCoprocessorHost().postScannerClose(s);
           }
@@ -2638,20 +2661,22 @@ public class HRegionServer implements Cl
     return this.fsOk;
   }
 
-  protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+  protected RegionScannerHolder addScanner(
+      RegionScanner s, HRegion r) throws LeaseStillHeldException {
+    RegionScannerHolder holder = new RegionScannerHolder(this, s, r);
+    String scannerName = null;
     long scannerId = -1;
     while (true) {
-      scannerId = rand.nextLong();
-      if (scannerId == -1) continue;
-      String scannerName = String.valueOf(scannerId);
-      RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
+      scannerId = nextLong();
+      scannerName = String.valueOf(scannerId);
+      RegionScannerHolder existing = scanners.putIfAbsent(scannerName, holder);
       if (existing == null) {
+        holder.scannerName = scannerName;
         this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
-            new ScannerListener(scannerName));
-        break;
+          new ScannerListener(scannerName));
+        return holder;
       }
     }
-    return scannerId;
   }
 
   /**
@@ -2913,7 +2938,6 @@ public class HRegionServer implements Cl
   @Override
   public ScanResponse scan(final RpcController controller,
       final ScanRequest request) throws ServiceException {
-    Leases.Lease lease = null;
     String scannerName = null;
     try {
       if (!request.hasScannerId() && !request.hasScan()) {
@@ -2963,7 +2987,10 @@ public class HRegionServer implements Cl
             throw new UnknownScannerException(
               "Name: " + scannerName + ", already closed?");
           }
-          scanner = rsh.s;
+          scanner = rsh.scanner;
+          // Use the region found in the online region list,
+          // not that one in the RegionScannerHolder. So that we can
+          // make sure the region is still open in this region server.
           region = getRegion(scanner.getRegionInfo().getRegionName());
         } else {
           region = getRegion(request.getRegion());
@@ -2974,7 +3001,6 @@ public class HRegionServer implements Cl
           if (!isLoadingCfsOnDemandSet) {
             scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
           }
-          byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
           region.prepareScanner(scan);
           if (region.getCoprocessorHost() != null) {
             scanner = region.getCoprocessorHost().preScannerOpen(scan);
@@ -2985,9 +3011,14 @@ public class HRegionServer implements Cl
           if (region.getCoprocessorHost() != null) {
             scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
           }
-          scannerId = addScanner(scanner);
-          scannerName = String.valueOf(scannerId);
+          rsh = addScanner(scanner, region);
+          scannerName = rsh.scannerName;
+          scannerId = Long.parseLong(scannerName);
+
           ttl = this.scannerLeaseTimeoutPeriod;
+          if (scan.getPrefetching()) {
+            rsh.enablePrefetching(scan.getCaching());
+          }
         }
 
         if (rows > 0) {
@@ -2995,110 +3026,34 @@ public class HRegionServer implements Cl
           // performed even before checking of Lease.
           // See HBASE-5974
           if (request.hasNextCallSeq()) {
-            if (rsh == null) {
-              rsh = scanners.get(scannerName);
-            }
-            if (rsh != null) {
-              if (request.getNextCallSeq() != rsh.nextCallSeq) {
-                throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
-                  + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
-                  "; request=" + TextFormat.shortDebugString(request));
-              }
-              // Increment the nextCallSeq value which is the next expected from client.
-              rsh.nextCallSeq++;
+            if (request.getNextCallSeq() != rsh.nextCallSeq) {
+              throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
+                "; request=" + TextFormat.shortDebugString(request));
             }
+            // Increment the nextCallSeq value which is the next expected from client.
+            rsh.nextCallSeq++;
           }
-          try {
-            // Remove lease while its being processed in server; protects against case
-            // where processing of request takes > lease expiration time.
-            lease = leases.removeLease(scannerName);
-            List<Result> results = new ArrayList<Result>(rows);
-            long currentScanResultSize = 0;
-
-            boolean done = false;
-            // Call coprocessor. Get region info from scanner.
-            if (region != null && region.getCoprocessorHost() != null) {
-              Boolean bypass = region.getCoprocessorHost().preScannerNext(
-                scanner, results, rows);
-              if (!results.isEmpty()) {
-                for (Result r : results) {
-                  if (maxScannerResultSize < Long.MAX_VALUE){
-                    for (KeyValue kv : r.raw()) {
-                      currentScanResultSize += kv.heapSize();
-                    }
-                  }
-                }
-              }
-              if (bypass != null && bypass.booleanValue()) {
-                done = true;
-              }
-            }
 
-            if (!done) {
-              long maxResultSize = scanner.getMaxResultSize();
-              if (maxResultSize <= 0) {
-                maxResultSize = maxScannerResultSize;
-              }
-              List<KeyValue> values = new ArrayList<KeyValue>();
-              MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
-              region.startRegionOperation(Operation.SCAN);
-              try {
-                int i = 0;
-                synchronized(scanner) {
-                  for (; i < rows
-                      && currentScanResultSize < maxResultSize; i++) {
-                    // Collect values to be returned here
-                    boolean moreRows = scanner.nextRaw(values);
-                    if (!values.isEmpty()) {
-                      if (maxScannerResultSize < Long.MAX_VALUE){
-                        for (KeyValue kv : values) {
-                          currentScanResultSize += kv.heapSize();
-                        }
-                      }
-                      results.add(new Result(values));
-                    }
-                    if (!moreRows) {
-                      break;
-                    }
-                    values.clear();
-                  }
-                }
-                region.readRequestsCount.add(i);
-              } finally {
-                region.closeRegionOperation();
-              }
-
-              // coprocessor postNext hook
-              if (region != null && region.getCoprocessorHost() != null) {
-                region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
-              }
-            }
+          ttl = this.scannerLeaseTimeoutPeriod;
+          ScanResult result = rsh.getScanResult(rows);
+          if (result.isException) {
+            throw result.ioException;
+          }
 
-            // If the scanner's filter - if any - is done with the scan
-            // and wants to tell the client to stop the scan. This is done by passing
-            // a null result, and setting moreResults to false.
-            if (scanner.isFilterDone() && results.isEmpty()) {
-              moreResults = false;
-              results = null;
-            } else {
-              ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
-              List<CellScannable> cellScannables = new ArrayList<CellScannable>(results.size());
-              for (Result res : results) {
-                cellScannables.add(res);
-                rcmBuilder.addCellsLength(res.size());
-              }
-              builder.setResultCellMeta(rcmBuilder.build());
-              // TODO is this okey to assume the type and cast
-              ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
-                  .createCellScanner(cellScannables));
-            }
-          } finally {
-            // We're done. On way out re-add the above removed lease.
-            // Adding resets expiration time on lease.
-            if (scanners.containsKey(scannerName)) {
-              if (lease != null) leases.addLease(lease);
-              ttl = this.scannerLeaseTimeoutPeriod;
+          moreResults = result.moreResults;
+          if (result.results != null) {
+            List<CellScannable> cellScannables =
+              new ArrayList<CellScannable>(result.results.size());
+            ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
+            for (Result res : result.results) {
+              cellScannables.add(res);
+              rcmBuilder.addCellsLength(res.size());
             }
+            builder.setResultCellMeta(rcmBuilder.build());
+            // TODO is this okey to assume the type and cast
+            ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
+              .createCellScanner(cellScannables));
           }
         }
 
@@ -3112,9 +3067,13 @@ public class HRegionServer implements Cl
           }
           rsh = scanners.remove(scannerName);
           if (rsh != null) {
-            scanner = rsh.s;
-            scanner.close();
-            leases.cancelLease(scannerName);
+            rsh.closeScanner();
+            try {
+              leases.cancelLease(scannerName);
+            } catch (LeaseException le) {
+              // That's ok, since the lease may be gone with
+              // the prefetcher when cancelled.
+            }
             if (region != null && region.getCoprocessorHost() != null) {
               region.getCoprocessorHost().postScannerClose(scanner);
             }
@@ -4181,18 +4140,6 @@ public class HRegionServer implements Cl
     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
   }
 
-  /**
-   * Holder class which holds the RegionScanner and nextCallSeq together.
-   */
-  private static class RegionScannerHolder {
-    private RegionScanner s;
-    private long nextCallSeq = 0L;
-
-    public RegionScannerHolder(RegionScanner s) {
-      this.s = s;
-    }
-  }
-
   private boolean isHealthCheckerConfigured() {
     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java?rev=1486246&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java Fri May 24 22:51:22 2013
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Holder class which holds the RegionScanner, nextCallSeq, ScanPrefetcher
+ * and information needed for prefetcher/fetcher.
+ *
+ * Originally, this is an inner class of HRegionServer. We moved it out
+ * since HRegionServer is getting bigger and bigger.
+ */
+@InterfaceAudience.Private
+public class RegionScannerHolder {
+  public final static String MAX_PREFETCHED_RESULT_SIZE_KEY
+    = "hbase.hregionserver.prefetcher.resultsize.max";
+  public final static int MAX_PREFETCHED_RESULT_SIZE_DEFAULT = 256 * 1024 * 1024;
+
+  final static Log LOG = LogFactory.getLog(RegionScannerHolder.class);
+  final static String PREFETCHER_THREAD_PREFIX = "scan-prefetch-";
+
+  private final static AtomicLong globalPrefetchedResultSize = new AtomicLong();
+
+  private ThreadPoolExecutor scanPrefetchThreadPool;
+  private Map<String, RegionScannerHolder> scanners;
+  private long maxScannerResultSize;
+  private Configuration conf;
+  private Leases leases;
+
+  private boolean prefetching = false;
+  private long maxGlobalPrefetchedResultSize;
+  private volatile Future<ScanResult> prefetchScanFuture;
+  private volatile long prefetchedResultSize;
+  private ScanPrefetcher prefetcher;
+  private HRegion region;
+  private int rows;
+
+  RegionScanner scanner;
+  long nextCallSeq = 0L;
+  String scannerName;
+
+  /**
+   * Get the total size of all prefetched results not retrieved yet.
+   */
+  public static long getPrefetchedResultSize() {
+    return globalPrefetchedResultSize.get();
+  }
+
+  /**
+   * Construct a RegionScanner holder for a specific region server.
+   *
+   * @param rs the region server the specific region is on
+   * @param s the scanner to be held
+   * @param r the region the scanner is for
+   */
+  RegionScannerHolder(HRegionServer rs, RegionScanner s, HRegion r) {
+    scanPrefetchThreadPool = rs.scanPrefetchThreadPool;
+    maxScannerResultSize = rs.maxScannerResultSize;
+    prefetcher = new ScanPrefetcher();
+    scanners = rs.scanners;
+    leases = rs.leases;
+    conf = rs.conf;
+    scanner = s;
+    region = r;
+  }
+
+  public boolean isPrefetchSubmitted() {
+    return prefetchScanFuture != null;
+  }
+
+  public HRegionInfo getRegionInfo() {
+    return region.getRegionInfo();
+  }
+
+  /**
+   * Find the current prefetched result size
+   */
+  public long currentPrefetchedResultSize() {
+    return prefetchedResultSize;
+  }
+
+  /**
+   * Wait till current prefetching task complete,
+   * return true if any data retrieved, false otherwise.
+   * Used for unit testing only.
+   */
+  public boolean waitForPrefetchingDone() {
+    if (prefetchScanFuture != null) {
+      try {
+        ScanResult scanResult = prefetchScanFuture.get();
+        return scanResult != null && scanResult.results != null
+          && !scanResult.results.isEmpty();
+      } catch (Throwable t) {
+        LOG.debug("Got exception in getting scan result", t);
+        if (t instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Stop any prefetching task and close the scanner.
+   * @throws IOException
+   */
+  public void closeScanner() throws IOException {
+    // stop prefetcher if needed.
+    if (prefetchScanFuture != null) {
+      synchronized (prefetcher) {
+        prefetcher.scannerClosing = true;
+        prefetchScanFuture.cancel(false);
+      }
+      prefetchScanFuture = null;
+      if (prefetchedResultSize > 0) {
+        globalPrefetchedResultSize.addAndGet(-prefetchedResultSize);
+        prefetchedResultSize = 0L;
+      }
+    }
+    scanner.close();
+  }
+
+  /**
+   * Get the prefetched scan result, if any. Otherwise,
+   * do a scan synchronously and return the result, which
+   * may take some time. Region scan coprocessor, if specified,
+   * is invoked properly, which may override the scan result.
+   *
+   * @param rows the number of rows to scan, which is preferred
+   * not to change among scanner.next() calls.
+   *
+   * @return scan result, which has the data retrieved from
+   * the scanner, or some IOException if the scan failed.
+   * @throws IOException if failed to retrieve from the scanner.
+   */
+  public ScanResult getScanResult(final int rows) throws IOException {
+    Preconditions.checkArgument(rows > 0, "Number of rows requested must be positive");
+    ScanResult scanResult = null;
+    this.rows = rows;
+
+    if (prefetchScanFuture == null) {
+      // Need to scan inline if not prefetched
+      scanResult = prefetcher.call();
+    } else {
+      // if we have a prefetched result, then use it
+      try {
+        scanResult = prefetchScanFuture.get();
+        if (scanResult.moreResults) {
+          int prefetchedRows = scanResult.results.size();
+          if (prefetchedRows != 0 && this.rows > prefetchedRows) {
+            // Try to scan more since we haven't prefetched enough
+            this.rows -= prefetchedRows;
+            ScanResult tmp = prefetcher.call();
+            if (tmp.isException) {
+              return tmp; // Keep the prefetched results for later
+            }
+            if (tmp.results != null && !tmp.results.isEmpty()) {
+              // Merge new results to the old result list
+              scanResult.results.addAll(tmp.results);
+            }
+            // Reset rows for next prefetching
+            this.rows = rows;
+          }
+        }
+        prefetchScanFuture = null;
+        if (prefetchedResultSize > 0) {
+          globalPrefetchedResultSize.addAndGet(-prefetchedResultSize);
+          prefetchedResultSize = 0L;
+        }
+      } catch (ExecutionException ee) {
+        throw new IOException("failed to run prefetching task", ee.getCause());
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        IOException iie = new InterruptedIOException("scan was interrupted");
+        iie.initCause(ie);
+        throw iie;
+      }
+    }
+
+    if (prefetching
+        && scanResult.moreResults && !scanResult.results.isEmpty()) {
+      long totalPrefetchedResultSize = globalPrefetchedResultSize.get();
+      if (totalPrefetchedResultSize < maxGlobalPrefetchedResultSize) {
+        // Schedule a background prefetch for the next result
+        // if prefetch is enabled on scans and there are more results
+        prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher);
+      } else if (LOG.isTraceEnabled()) {
+        LOG.trace("One prefetching is skipped for scanner " + scannerName
+          + " since total prefetched result size " + totalPrefetchedResultSize
+          + " is more than the maximum configured "
+          + maxGlobalPrefetchedResultSize);
+      }
+    }
+    return scanResult;
+  }
+
+  /**
+   * Set the rows to prefetch, and start the prefetching task.
+   */
+  public void enablePrefetching(int caching) {
+    if (caching > 0) {
+      rows = caching;
+    } else {
+      rows = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
+        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+    }
+    maxGlobalPrefetchedResultSize = conf.getLong(
+      MAX_PREFETCHED_RESULT_SIZE_KEY, MAX_PREFETCHED_RESULT_SIZE_DEFAULT);
+    if (globalPrefetchedResultSize.get() < maxGlobalPrefetchedResultSize) {
+      prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher);
+    }
+    prefetching = true;
+  }
+
+  /**
+   * This Callable abstracts calling a pre-fetch next. This is called on a
+   * threadpool. It makes a pre-fetch next call with the same parameters as
+   * the incoming next call. Note that the number of rows to return (nbRows)
+   * and/or the memory size for the result is the same as the previous call if
+   * pre-fetching is enabled. If these parameters change dynamically,
+   * they will take effect in the subsequent iteration.
+   */
+  class ScanPrefetcher implements Callable<ScanResult> {
+    boolean scannerClosing = false;
+
+    public ScanResult call() {
+      ScanResult scanResult = null;
+      Leases.Lease lease = null;
+      try {
+        // Remove lease while its being processed in server; protects against case
+        // where processing of request takes > lease expiration time.
+        lease = leases.removeLease(scannerName);
+        List<Result> results = new ArrayList<Result>(rows);
+        long currentScanResultSize = 0;
+        boolean moreResults = true;
+
+        boolean done = false;
+        long maxResultSize = scanner.getMaxResultSize();
+        if (maxResultSize <= 0) {
+          maxResultSize = maxScannerResultSize;
+        }
+        String threadName = Thread.currentThread().getName();
+        boolean prefetchingThread = threadName.startsWith(PREFETCHER_THREAD_PREFIX);
+        // Call coprocessor. Get region info from scanner.
+        if (region != null && region.getCoprocessorHost() != null) {
+          Boolean bypass = region.getCoprocessorHost().preScannerNext(
+            scanner, results, rows);
+          if (!results.isEmpty()
+              && (prefetchingThread || maxResultSize < Long.MAX_VALUE)) {
+            for (Result r : results) {
+              for (KeyValue kv : r.raw()) {
+                currentScanResultSize += kv.heapSize();
+              }
+            }
+          }
+          if (bypass != null && bypass.booleanValue()) {
+            done = true;
+          }
+        }
+
+        if (!done) {
+          List<KeyValue> values = new ArrayList<KeyValue>();
+          MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+          region.startRegionOperation();
+          try {
+            int i = 0;
+            synchronized(scanner) {
+              for (; i < rows
+                  && currentScanResultSize < maxResultSize; i++) {
+                // Collect values to be returned here
+                boolean moreRows = scanner.nextRaw(values);
+                if (!values.isEmpty()) {
+                  if (prefetchingThread || maxResultSize < Long.MAX_VALUE){
+                    for (KeyValue kv : values) {
+                      currentScanResultSize += kv.heapSize();
+                    }
+                  }
+                  results.add(new Result(values));
+                }
+                if (!moreRows) {
+                  break;
+                }
+                values.clear();
+              }
+            }
+            region.readRequestsCount.add(i);
+          } finally {
+            region.closeRegionOperation();
+          }
+
+          // coprocessor postNext hook
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+          }
+        }
+
+        // If the scanner's filter - if any - is done with the scan
+        // and wants to tell the client to stop the scan. This is done by passing
+        // a null result, and setting moreResults to false.
+        if (scanner.isFilterDone() && results.isEmpty()) {
+          moreResults = false;
+          results = null;
+        }
+        scanResult = new ScanResult(moreResults, results);
+        if (prefetchingThread && currentScanResultSize > 0) {
+          synchronized (prefetcher) {
+            if (!scannerClosing) {
+              globalPrefetchedResultSize.addAndGet(currentScanResultSize);
+              prefetchedResultSize = currentScanResultSize;
+            }
+          }
+        }
+      } catch (IOException e) {
+        // we should queue the exception as the result so that we can return
+        // this when the result is asked for
+        scanResult = new ScanResult(e);
+      } finally {
+        // We're done. On way out re-add the above removed lease.
+        // Adding resets expiration time on lease.
+        if (scanners.containsKey(scannerName)) {
+          if (lease != null) {
+            try {
+              leases.addLease(lease);
+            } catch (LeaseStillHeldException e) {
+              LOG.error("THIS SHOULD NOT HAPPEN", e);
+            }
+          }
+        }
+      }
+      return scanResult;
+    }
+  }
+}
+
+/**
+ * This class abstracts the results of a single scanner's result. It tracks
+ * the list of Result objects if the pre-fetch next was successful, and
+ * tracks the exception if the next failed.
+ */
+class ScanResult {
+  final boolean isException;
+  IOException ioException = null;
+
+  List<Result> results = null;
+  boolean moreResults = false;
+
+  public ScanResult(IOException ioException) {
+    this.ioException = ioException;
+    isException = true;
+  }
+
+  public ScanResult(boolean moreResults, List<Result> results) {
+    this.moreResults = moreResults;
+    this.results = results;
+    isException = false;
+  }
+}

Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java Fri May 24 22:51:22 2013
@@ -17,31 +17,41 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTestConst;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScannerHolder;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * A client-side test, mostly testing scanners with various parameters.
  */
 @Category(MediumTests.class)
+@RunWith(Parameterized.class)
 public class TestScannersFromClientSide {
   private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class);
 
@@ -51,6 +61,37 @@ public class TestScannersFromClientSide 
   private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
   private static byte [] VALUE = Bytes.toBytes("testValue");
 
+  private final boolean prefetching;
+  private long maxSize;
+
+  @Parameters
+  public static final Collection<Object[]> parameters() {
+    List<Object[]> prefetchings = new ArrayList<Object[]>();
+    prefetchings.add(new Object[] {Long.valueOf(-1)});
+    prefetchings.add(new Object[] {Long.valueOf(0)});
+    prefetchings.add(new Object[] {Long.valueOf(1)});
+    prefetchings.add(new Object[] {Long.valueOf(1024)});
+    return prefetchings;
+  }
+
+  public TestScannersFromClientSide(Long maxPrefetchedResultSize) {
+    this.maxSize = maxPrefetchedResultSize.longValue();
+    if (this.maxSize < 0) {
+      this.prefetching = false;
+    } else {
+      this.prefetching = true;
+      if (this.maxSize == 0) {
+        this.maxSize = RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_DEFAULT;
+      } else {
+        MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+        for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
+          Configuration conf = rst.getRegionServer().getConfiguration();
+          conf.setLong(RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_KEY, maxSize);
+        }
+      }
+    }
+  }
+
   /**
    * @throws java.lang.Exception
    */
@@ -65,22 +106,9 @@ public class TestScannersFromClientSide 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @Before
-  public void setUp() throws Exception {
-    // Nothing to do.
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @After
-  public void tearDown() throws Exception {
-    // Nothing to do.
+    long remainingPrefetchedSize = RegionScannerHolder.getPrefetchedResultSize();
+    assertEquals("All prefetched results should be gone",
+      0, remainingPrefetchedSize);
   }
 
   /**
@@ -89,8 +117,23 @@ public class TestScannersFromClientSide 
    * @throws Exception
    */
   @Test
+  public void testScanBatchWithDefaultCaching() throws Exception {
+    batchedScanWithCachingSpecified(-1);  // Using default caching which is 100
+  }
+
+    /**
+     * Test from client side for batch of scan
+     *
+     * @throws Exception
+     */
+    @Test
   public void testScanBatch() throws Exception {
-    byte [] TABLE = Bytes.toBytes("testScanBatch");
+      batchedScanWithCachingSpecified(1);
+  }
+
+  private void batchedScanWithCachingSpecified(int caching) throws Exception {
+    byte [] TABLE = Bytes.toBytes(
+      "testScanBatch-" + prefetching + "_" + maxSize + "_" + caching);
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
 
     HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
@@ -99,7 +142,7 @@ public class TestScannersFromClientSide 
     Scan scan;
     Delete delete;
     Result result;
-    ResultScanner scanner;
+    ClientScanner scanner;
     boolean toLog = true;
     List<KeyValue> kvListExp;
 
@@ -124,8 +167,11 @@ public class TestScannersFromClientSide 
 
     // without batch
     scan = new Scan(ROW);
+    scan.setCaching(caching);
     scan.setMaxVersions();
-    scanner = ht.getScanner(scan);
+    scan.setPrefetching(prefetching);
+    scanner = (ClientScanner)ht.getScanner(scan);
+    verifyPrefetching(scanner);
 
     // c4:4, c5:5, c6:6, c7:7
     kvListExp = new ArrayList<KeyValue>();
@@ -135,12 +181,16 @@ public class TestScannersFromClientSide 
     kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
     result = scanner.next();
     verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
+    verifyPrefetching(scanner);
 
     // with batch
     scan = new Scan(ROW);
+    scan.setCaching(caching);
     scan.setMaxVersions();
     scan.setBatch(2);
-    scanner = ht.getScanner(scan);
+    scan.setPrefetching(prefetching);
+    scanner = (ClientScanner)ht.getScanner(scan);
+    verifyPrefetching(scanner);
 
     // First batch: c4:4, c5:5
     kvListExp = new ArrayList<KeyValue>();
@@ -148,6 +198,7 @@ public class TestScannersFromClientSide 
     kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
     result = scanner.next();
     verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
+    verifyPrefetching(scanner);
 
     // Second batch: c6:6, c7:7
     kvListExp = new ArrayList<KeyValue>();
@@ -155,7 +206,7 @@ public class TestScannersFromClientSide 
     kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
     result = scanner.next();
     verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
-
+    verifyPrefetching(scanner);
   }
 
   /**
@@ -165,7 +216,7 @@ public class TestScannersFromClientSide 
    */
   @Test
   public void testGetMaxResults() throws Exception {
-    byte [] TABLE = Bytes.toBytes("testGetMaxResults");
+    byte [] TABLE = Bytes.toBytes("testGetMaxResults-" + prefetching + "_" + maxSize);
     byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
 
@@ -285,7 +336,7 @@ public class TestScannersFromClientSide 
    */
   @Test
   public void testScanMaxResults() throws Exception {
-    byte [] TABLE = Bytes.toBytes("testScanLimit");
+    byte [] TABLE = Bytes.toBytes("testScanLimit-" + prefetching + "_" + maxSize);
     byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
     byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
@@ -315,17 +366,19 @@ public class TestScannersFromClientSide 
     }
 
     scan = new Scan();
+    scan.setCaching(1);
+    scan.setPrefetching(prefetching);
     scan.setMaxResultsPerColumnFamily(4);
-    ResultScanner scanner = ht.getScanner(scan);
+    ClientScanner scanner = (ClientScanner)ht.getScanner(scan);
     kvListScan = new ArrayList<KeyValue>();
     while ((result = scanner.next()) != null) {
+      verifyPrefetching(scanner);
       for (KeyValue kv : result.list()) {
         kvListScan.add(kv);
       }
     }
     result = new Result(kvListScan);
     verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
-
   }
 
   /**
@@ -335,7 +388,7 @@ public class TestScannersFromClientSide 
    */
   @Test
   public void testGetRowOffset() throws Exception {
-    byte [] TABLE = Bytes.toBytes("testGetRowOffset");
+    byte [] TABLE = Bytes.toBytes("testGetRowOffset-" + prefetching + "_" + maxSize);
     byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
 
@@ -421,7 +474,47 @@ public class TestScannersFromClientSide 
     kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
     verifyResult(result, kvListExp, toLog,
        "Testing offset + multiple CFs + maxResults");
+  }
 
+  /**
+   * For testing only, find a region scanner holder for a scan.
+   */
+  RegionScannerHolder findRegionScannerHolder(ClientScanner scanner) {
+    long scannerId = scanner.currentScannerId();
+    if (scannerId == -1L) return null;
+
+    HRegionInfo expectedRegion = scanner.currentRegionInfo();
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
+      RegionScannerHolder rsh = rst.getRegionServer().getScannerHolder(scannerId);
+      if (rsh != null && rsh.getRegionInfo().equals(expectedRegion)) {
+        return rsh;
+      }
+    }
+    return null;
+  }
+
+  void verifyPrefetching(ClientScanner scanner) throws IOException {
+    long scannerId = scanner.currentScannerId();
+    if (scannerId == -1L) return; // scanner is already closed
+    RegionScannerHolder rsh = findRegionScannerHolder(scanner);
+    assertNotNull("We should be able to find the scanner", rsh);
+    boolean isPrefetchSubmitted = rsh.isPrefetchSubmitted();
+    if (prefetching && (RegionScannerHolder.getPrefetchedResultSize() < this.maxSize)) {
+      assertTrue("Prefetching should be submitted or no more result",
+        isPrefetchSubmitted || scanner.next() == null);
+    } else if (isPrefetchSubmitted) {
+      // Prefetch submitted, it must be because prefetching is enabled,
+      // and there was still room before it's scheduled
+      long sizeBefore = RegionScannerHolder.getPrefetchedResultSize()
+        - rsh.currentPrefetchedResultSize();
+      assertTrue("There should have room before prefetching is submitted",
+        prefetching && sizeBefore < this.maxSize);
+    }
+    if (isPrefetchSubmitted && rsh.waitForPrefetchingDone()) {
+      assertTrue("Prefetched result size should not be 0",
+        rsh.currentPrefetchedResultSize() > 0);
+    }
   }
 
   static void verifyResult(Result result, List<KeyValue> expKvList, boolean toLog,
@@ -449,6 +542,4 @@ public class TestScannersFromClientSide 
 
     assertEquals(expKvList.size(), result.size());
   }
-
-
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java Fri May 24 22:51:22 2013
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -130,6 +129,7 @@ public class TestRowProcessorEndpoint {
       // ignore table not found
     }
     table = util.createTable(TABLE, FAM);
+    table.setAutoFlush(false);
     {
       Put put = new Put(ROW);
       put.add(FAM, A, Bytes.add(B, C));    // B, C are friends of A
@@ -143,6 +143,8 @@ public class TestRowProcessorEndpoint {
     put.add(FAM, F, G);
     table.put(put);
     row2Size = put.size();
+    table.clearRegionCache();
+    table.flushCommits();
   }
 
   @Test

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java Fri May 24 22:51:22 2013
@@ -284,6 +284,7 @@ public class TestProtobufUtil {
     scanBuilder = ClientProtos.Scan.newBuilder(proto);
     scanBuilder.setMaxVersions(1);
     scanBuilder.setCacheBlocks(true);
+    scanBuilder.setPrefetching(true);
 
     Scan scan = ProtobufUtil.toScan(proto);
     assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan));

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java?rev=1486246&r1=1486245&r2=1486246&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java Fri May 24 22:51:22 2013
@@ -335,6 +335,7 @@ public class TestRegionServerMetrics {
     Scan s = new Scan();
     s.setBatch(1);
     s.setCaching(1);
+    s.setPrefetching(false);
     ResultScanner resultScanners = t.getScanner(s);
 
     for (int nextCount = 0; nextCount < 30; nextCount++) {