You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/07/04 00:36:22 UTC

svn commit: r1356995 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/apache/hadoop/hbase/regionserver/ main/pr...

Author: mbautin
Date: Tue Jul  3 22:36:20 2012
New Revision: 1356995

URL: http://svn.apache.org/viewvc?rev=1356995&view=rev
Log:
[jira] [HBASE-5104] Provide a reliable intra-row pagination mechanism

Summary: Porting Madhu's patch for intra-row pagination (rHBASEEIGHTNINEFBBRANCH1326043) to trunk. This is what we have in 89-fb just as a starting point (currently there are test failures).

Test Plan: Run unit tests

Reviewers: madhuvaidya, lhofhansl, Kannan, tedyu, stack, todd, JIRA, jxcn01

Reviewed By: tedyu

CC: jxcn01, Liyin

Differential Revision: https://reviews.facebook.net/D2799

Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/hbase-server/src/main/protobuf/Client.proto
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1356995&r1=1356994&r2=1356995&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Get.java Tue Jul  3 22:36:20 2012
@@ -69,17 +69,30 @@ import java.util.TreeSet;
 @InterfaceStability.Stable
 public class Get extends OperationWithAttributes
   implements Writable, Row, Comparable<Row> {
-  private static final byte GET_VERSION = (byte)2;
+
+  private static final byte VERSION_WITHOUT_PAGINATION = (byte) 2;
+  private static final byte VERSION_WITH_PAGINATION = (byte) 3;
+  private static final byte GET_VERSION = VERSION_WITH_PAGINATION;
 
   private byte [] row = null;
   private long lockId = -1L;
   private int maxVersions = 1;
   private boolean cacheBlocks = true;
+  private int storeLimit = -1;
+  private int storeOffset = 0;
   private Filter filter = null;
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
 
+  /** @return the most backward-compatible version for this scan possible for its parameters */
+  private byte getVersion() {
+    if (storeLimit != -1 || storeOffset != 0) {
+      return VERSION_WITH_PAGINATION;
+    }
+    return VERSION_WITHOUT_PAGINATION;
+  }
+
   /** Constructor for Writable.  DO NOT USE */
   public Get() {}
 
@@ -194,6 +207,27 @@ public class Get extends OperationWithAt
   }
 
   /**
+   * Set the maximum number of values to return per row per Column Family
+   * @param limit the maximum number of values returned / row / CF
+   * @return this for invocation chaining
+   */
+  public Get setMaxResultsPerColumnFamily(int limit) {
+    this.storeLimit = limit;
+    return this;
+  }
+
+  /**
+   * Set offset for the row per Column Family. This offset is only within a particular row/CF
+   * combination. It gets reset back to zero when we move to the next row or CF.
+   * @param offset is the number of kvs that will be skipped.
+   * @return this for invocation chaining
+   */
+  public Get setRowOffsetPerColumnFamily(int offset) {
+    this.storeOffset = offset;
+    return this;
+  }
+
+  /**
    * Apply the specified server-side filter when performing the Get.
    * Only {@link Filter#filterKeyValue(KeyValue)} is called AFTER all tests
    * for ttl, column match, deletes and max versions have been run.
@@ -270,6 +304,24 @@ public class Get extends OperationWithAt
   }
 
   /**
+   * Method for retrieving the get's maximum number of values
+   * to return per Column Family
+   * @return the maximum number of values to fetch per CF
+   */
+  public int getMaxResultsPerColumnFamily() {
+    return this.storeLimit;
+  }
+
+  /**
+   * Method for retrieving the get's offset per row per column
+   * family (#kvs to be skipped)
+   * @return the row offset
+   */
+  public int getRowOffsetPerColumnFamily() {
+    return this.storeOffset;
+  }
+
+  /**
    * Method for retrieving the get's TimeRange
    * @return timeRange
    */
@@ -399,6 +451,10 @@ public class Get extends OperationWithAt
     this.row = Bytes.readByteArray(in);
     this.lockId = in.readLong();
     this.maxVersions = in.readInt();
+    if (version >= VERSION_WITH_PAGINATION) {
+      this.storeLimit = in.readInt();
+      this.storeOffset = in.readInt();
+    }
     boolean hasFilter = in.readBoolean();
     if (hasFilter) {
       this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in)));
@@ -429,10 +485,15 @@ public class Get extends OperationWithAt
 
   public void write(final DataOutput out)
   throws IOException {
-    out.writeByte(GET_VERSION);
+    byte version = getVersion();
+    out.writeByte(version);
     Bytes.writeByteArray(out, this.row);
     out.writeLong(this.lockId);
     out.writeInt(this.maxVersions);
+    if (version >= VERSION_WITH_PAGINATION) {
+      out.writeInt(this.storeLimit);
+      out.writeInt(this.storeOffset);
+    }
     if(this.filter == null) {
       out.writeBoolean(false);
     } else {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1356995&r1=1356994&r2=1356995&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java Tue Jul  3 22:36:20 2012
@@ -91,11 +91,20 @@ public class Scan extends OperationWithA
   private static final String RAW_ATTR = "_raw_";
   private static final String ISOLATION_LEVEL = "_isolationlevel_";
 
-  private static final byte SCAN_VERSION = (byte)3;
+  private static final byte VERSION_WITH_PAGINATION = (byte)4;
+  private static final byte VERSION_WITH_RESULT_SIZE = (byte)3;
+  private static final byte VERSION_WITH_ATTRIBUTES = (byte)2;
+  
+  private static final byte SCAN_VERSION = VERSION_WITH_PAGINATION;
+  
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
   private int batch = -1;
+
+  private int storeLimit = -1;
+  private int storeOffset = 0;
+  
   // If application wants to collect scan metrics, it needs to
   // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
   static public String SCAN_ATTRIBUTES_METRICS_ENABLE =
@@ -115,6 +124,22 @@ public class Scan extends OperationWithA
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
 
   /**
+   * @return the most backward-compatible version for this scan possible for its parameters
+   */
+  private byte getVersion() {
+    if (storeLimit != -1 || storeOffset != 0) {
+      return VERSION_WITH_PAGINATION;
+    }
+    if (maxResultSize != -1) { 
+      return VERSION_WITH_RESULT_SIZE;
+    }
+    if (getAttributeSize() != 0) {
+      return VERSION_WITH_ATTRIBUTES;
+    }
+    return 1;
+  }
+  
+  /**
    * Create a Scan operation across all rows.
    */
   public Scan() {}
@@ -156,6 +181,8 @@ public class Scan extends OperationWithA
     stopRow  = scan.getStopRow();
     maxVersions = scan.getMaxVersions();
     batch = scan.getBatch();
+    storeLimit = scan.getMaxResultsPerColumnFamily();
+    storeOffset = scan.getRowOffsetPerColumnFamily();
     caching = scan.getCaching();
     maxResultSize = scan.getMaxResultSize();
     cacheBlocks = scan.getCacheBlocks();
@@ -189,6 +216,8 @@ public class Scan extends OperationWithA
     this.filter = get.getFilter();
     this.cacheBlocks = get.getCacheBlocks();
     this.maxVersions = get.getMaxVersions();
+    this.storeLimit = get.getMaxResultsPerColumnFamily();
+    this.storeOffset = get.getRowOffsetPerColumnFamily();
     this.tr = get.getTimeRange();
     this.familyMap = get.getFamilyMap();
   }
@@ -324,6 +353,22 @@ public class Scan extends OperationWithA
   }
 
   /**
+   * Set the maximum number of values to return per row per Column Family
+   * @param limit the maximum number of values returned / row / CF
+   */
+  public void setMaxResultsPerColumnFamily(int limit) {
+    this.storeLimit = limit;
+  }
+
+  /**
+   * Set offset for the row per Column Family.
+   * @param offset is the number of kvs that will be skipped.
+   */
+  public void setRowOffsetPerColumnFamily(int offset) {
+    this.storeOffset = offset;
+  }
+
+  /**
    * Set the number of rows for caching that will be passed to scanners.
    * If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
    * Higher caching values will enable faster scanners but will use more memory.
@@ -435,6 +480,22 @@ public class Scan extends OperationWithA
   }
 
   /**
+   * @return maximum number of values to return per row per CF
+   */
+  public int getMaxResultsPerColumnFamily() {
+    return this.storeLimit;
+  }
+
+  /**
+   * Method for retrieving the scan's offset per row per column
+   * family (#kvs to be skipped)
+   * @return row offset
+   */
+  public int getRowOffsetPerColumnFamily() {
+    return this.storeOffset;
+  }
+
+  /**
    * @return caching the number of rows fetched when calling next on a scanner
    */
   public int getCaching() {
@@ -591,6 +652,10 @@ public class Scan extends OperationWithA
     this.stopRow = Bytes.readByteArray(in);
     this.maxVersions = in.readInt();
     this.batch = in.readInt();
+    if (version >= VERSION_WITH_PAGINATION) {
+      this.storeLimit = in.readInt();
+      this.storeOffset = in.readInt();
+    }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -613,21 +678,26 @@ public class Scan extends OperationWithA
       this.familyMap.put(family, set);
     }
 
-    if (version > 1) {
+    if (version >= VERSION_WITH_ATTRIBUTES) {
       readAttributes(in);
     }
-    if (version > 2) {
+    if (version >= VERSION_WITH_RESULT_SIZE) {
       this.maxResultSize = in.readLong();
     }
   }
 
   public void write(final DataOutput out)
   throws IOException {
-    out.writeByte(SCAN_VERSION);
+    byte version = getVersion();
+    out.writeByte(version);
     Bytes.writeByteArray(out, this.startRow);
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
     out.writeInt(this.batch);
+    if (version >= VERSION_WITH_PAGINATION) {
+      out.writeInt(this.storeLimit);
+      out.writeInt(this.storeOffset);
+    }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {
@@ -651,8 +721,12 @@ public class Scan extends OperationWithA
         out.writeInt(0);
       }
     }
-    writeAttributes(out);
-    out.writeLong(maxResultSize);
+    if (version >= VERSION_WITH_ATTRIBUTES) {
+      writeAttributes(out);
+    }
+    if (version >= VERSION_WITH_RESULT_SIZE) {
+      out.writeLong(maxResultSize);
+    }
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1356995&r1=1356994&r2=1356995&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Jul  3 22:36:20 2012
@@ -345,6 +345,12 @@ public final class ProtobufUtil {
     if (proto.hasMaxVersions()) {
       get.setMaxVersions(proto.getMaxVersions());
     }
+    if (proto.hasStoreLimit()) {
+      get.setMaxResultsPerColumnFamily(proto.getStoreLimit());
+    }
+    if (proto.hasStoreOffset()) {
+      get.setRowOffsetPerColumnFamily(proto.getStoreOffset());
+    }
     if (proto.hasTimeRange()) {
       HBaseProtos.TimeRange timeRange = proto.getTimeRange();
       long minStamp = 0;
@@ -612,6 +618,12 @@ public final class ProtobufUtil {
       }
       scanBuilder.addColumn(columnBuilder.build());
     }
+    if (scan.getMaxResultsPerColumnFamily() >= 0) {
+      scanBuilder.setStoreLimit(scan.getMaxResultsPerColumnFamily());
+    }
+    if (scan.getRowOffsetPerColumnFamily() > 0) {
+      scanBuilder.setStoreOffset(scan.getRowOffsetPerColumnFamily());
+    }
     return scanBuilder.build();
   }
 
@@ -639,6 +651,12 @@ public final class ProtobufUtil {
     if (proto.hasMaxVersions()) {
       scan.setMaxVersions(proto.getMaxVersions());
     }
+    if (proto.hasStoreLimit()) {
+      scan.setMaxResultsPerColumnFamily(proto.getStoreLimit());
+    }
+    if (proto.hasStoreOffset()) {
+      scan.setRowOffsetPerColumnFamily(proto.getStoreOffset());
+    }
     if (proto.hasTimeRange()) {
       HBaseProtos.TimeRange timeRange = proto.getTimeRange();
       long minStamp = 0;
@@ -770,6 +788,12 @@ public final class ProtobufUtil {
         builder.addColumn(columnBuilder.build());
       }
     }
+    if (get.getMaxResultsPerColumnFamily() >= 0) {
+      builder.setStoreLimit(get.getMaxResultsPerColumnFamily());
+    }
+    if (get.getRowOffsetPerColumnFamily() > 0) {
+      builder.setStoreOffset(get.getRowOffsetPerColumnFamily());
+    }
     return builder.build();
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1356995&r1=1356994&r2=1356995&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java Tue Jul  3 22:36:20 2012
@@ -549,6 +549,14 @@ public final class ClientProtos {
     // optional bool cacheBlocks = 8 [default = true];
     boolean hasCacheBlocks();
     boolean getCacheBlocks();
+
+    // optional uint32 storeLimit = 9;
+    boolean hasStoreLimit();
+    int getStoreLimit();
+
+    // optional uint32 storeOffset = 10;
+    boolean hasStoreOffset();
+    int getStoreOffset();
   }
   public static final class Get extends
       com.google.protobuf.GeneratedMessage
@@ -687,6 +695,26 @@ public final class ClientProtos {
       return cacheBlocks_;
     }
     
+    // optional uint32 storeLimit = 9;
+    public static final int STORELIMIT_FIELD_NUMBER = 9;
+    private int storeLimit_;
+    public boolean hasStoreLimit() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public int getStoreLimit() {
+      return storeLimit_;
+    }
+
+    // optional uint32 storeOffset = 10;
+    public static final int STOREOFFSET_FIELD_NUMBER = 10;
+    private int storeOffset_;
+    public boolean hasStoreOffset() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    public int getStoreOffset() {
+      return storeOffset_;
+    }
+
     private void initFields() {
       row_ = com.google.protobuf.ByteString.EMPTY;
       column_ = java.util.Collections.emptyList();
@@ -696,6 +724,8 @@ public final class ClientProtos {
       timeRange_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TimeRange.getDefaultInstance();
       maxVersions_ = 1;
       cacheBlocks_ = true;
+      storeLimit_ = 0;
+      storeOffset_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -755,6 +785,12 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBool(8, cacheBlocks_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeUInt32(9, storeLimit_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeUInt32(10, storeOffset_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -796,6 +832,14 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(8, cacheBlocks_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(9, storeLimit_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(10, storeOffset_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -853,6 +897,16 @@ public final class ClientProtos {
         result = result && (getCacheBlocks()
             == other.getCacheBlocks());
       }
+      result = result && (hasStoreLimit() == other.hasStoreLimit());
+      if (hasStoreLimit()) {
+        result = result && (getStoreLimit()
+            == other.getStoreLimit());
+      }
+      result = result && (hasStoreOffset() == other.hasStoreOffset());
+      if (hasStoreOffset()) {
+        result = result && (getStoreOffset()
+            == other.getStoreOffset());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -894,6 +948,14 @@ public final class ClientProtos {
         hash = (37 * hash) + CACHEBLOCKS_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getCacheBlocks());
       }
+      if (hasStoreLimit()) {
+        hash = (37 * hash) + STORELIMIT_FIELD_NUMBER;
+        hash = (53 * hash) + getStoreLimit();
+      }
+      if (hasStoreOffset()) {
+        hash = (37 * hash) + STOREOFFSET_FIELD_NUMBER;
+        hash = (53 * hash) + getStoreOffset();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -1046,6 +1108,10 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000040);
         cacheBlocks_ = true;
         bitField0_ = (bitField0_ & ~0x00000080);
+        storeLimit_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000100);
+        storeOffset_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000200);
         return this;
       }
       
@@ -1134,6 +1200,14 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.cacheBlocks_ = cacheBlocks_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.storeLimit_ = storeLimit_;
+        if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.storeOffset_ = storeOffset_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1220,6 +1294,12 @@ public final class ClientProtos {
         if (other.hasCacheBlocks()) {
           setCacheBlocks(other.getCacheBlocks());
         }
+        if (other.hasStoreLimit()) {
+          setStoreLimit(other.getStoreLimit());
+        }
+        if (other.hasStoreOffset()) {
+          setStoreOffset(other.getStoreOffset());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1323,6 +1403,16 @@ public final class ClientProtos {
               cacheBlocks_ = input.readBool();
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000100;
+              storeLimit_ = input.readUInt32();
+              break;
+            }
+            case 80: {
+              bitField0_ |= 0x00000200;
+              storeOffset_ = input.readUInt32();
+              break;
+            }
           }
         }
       }
@@ -1968,6 +2058,48 @@ public final class ClientProtos {
         return this;
       }
       
+      // optional uint32 storeLimit = 9;
+      private int storeLimit_ ;
+      public boolean hasStoreLimit() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      public int getStoreLimit() {
+        return storeLimit_;
+      }
+      public Builder setStoreLimit(int value) {
+        bitField0_ |= 0x00000100;
+        storeLimit_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearStoreLimit() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        storeLimit_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional uint32 storeOffset = 10;
+      private int storeOffset_ ;
+      public boolean hasStoreOffset() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      public int getStoreOffset() {
+        return storeOffset_;
+      }
+      public Builder setStoreOffset(int value) {
+        bitField0_ |= 0x00000200;
+        storeOffset_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearStoreOffset() {
+        bitField0_ = (bitField0_ & ~0x00000200);
+        storeOffset_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:Get)
     }
     
@@ -8732,6 +8864,14 @@ public final class ClientProtos {
     // optional uint64 maxResultSize = 10;
     boolean hasMaxResultSize();
     long getMaxResultSize();
+
+    // optional uint32 storeLimit = 11;
+    boolean hasStoreLimit();
+    int getStoreLimit();
+
+    // optional uint32 storeOffset = 12;
+    boolean hasStoreOffset();
+    int getStoreOffset();
   }
   public static final class Scan extends
       com.google.protobuf.GeneratedMessage
@@ -8890,6 +9030,26 @@ public final class ClientProtos {
       return maxResultSize_;
     }
     
+    // optional uint32 storeLimit = 11;
+    public static final int STORELIMIT_FIELD_NUMBER = 11;
+    private int storeLimit_;
+    public boolean hasStoreLimit() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    public int getStoreLimit() {
+      return storeLimit_;
+    }
+
+    // optional uint32 storeOffset = 12;
+    public static final int STOREOFFSET_FIELD_NUMBER = 12;
+    private int storeOffset_;
+    public boolean hasStoreOffset() {
+      return ((bitField0_ & 0x00000200) == 0x00000200);
+    }
+    public int getStoreOffset() {
+      return storeOffset_;
+    }
+
     private void initFields() {
       column_ = java.util.Collections.emptyList();
       attribute_ = java.util.Collections.emptyList();
@@ -8901,6 +9061,8 @@ public final class ClientProtos {
       cacheBlocks_ = true;
       batchSize_ = 0;
       maxResultSize_ = 0L;
+      storeLimit_ = 0;
+      storeOffset_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8962,6 +9124,12 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000080) == 0x00000080)) {
         output.writeUInt64(10, maxResultSize_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeUInt32(11, storeLimit_);
+      }
+      if (((bitField0_ & 0x00000200) == 0x00000200)) {
+        output.writeUInt32(12, storeOffset_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -9011,6 +9179,14 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt64Size(10, maxResultSize_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(11, storeLimit_);
+      }
+      if (((bitField0_ & 0x00000200) == 0x00000200)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(12, storeOffset_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -9078,6 +9254,16 @@ public final class ClientProtos {
         result = result && (getMaxResultSize()
             == other.getMaxResultSize());
       }
+      result = result && (hasStoreLimit() == other.hasStoreLimit());
+      if (hasStoreLimit()) {
+        result = result && (getStoreLimit()
+            == other.getStoreLimit());
+      }
+      result = result && (hasStoreOffset() == other.hasStoreOffset());
+      if (hasStoreOffset()) {
+        result = result && (getStoreOffset()
+            == other.getStoreOffset());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -9127,6 +9313,14 @@ public final class ClientProtos {
         hash = (37 * hash) + MAXRESULTSIZE_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getMaxResultSize());
       }
+      if (hasStoreLimit()) {
+        hash = (37 * hash) + STORELIMIT_FIELD_NUMBER;
+        hash = (53 * hash) + getStoreLimit();
+      }
+      if (hasStoreOffset()) {
+        hash = (37 * hash) + STOREOFFSET_FIELD_NUMBER;
+        hash = (53 * hash) + getStoreOffset();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -9283,6 +9477,10 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000100);
         maxResultSize_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000200);
+        storeLimit_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000400);
+        storeOffset_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000800);
         return this;
       }
       
@@ -9379,6 +9577,14 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000080;
         }
         result.maxResultSize_ = maxResultSize_;
+        if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        result.storeLimit_ = storeLimit_;
+        if (((from_bitField0_ & 0x00000800) == 0x00000800)) {
+          to_bitField0_ |= 0x00000200;
+        }
+        result.storeOffset_ = storeOffset_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -9471,6 +9677,12 @@ public final class ClientProtos {
         if (other.hasMaxResultSize()) {
           setMaxResultSize(other.getMaxResultSize());
         }
+        if (other.hasStoreLimit()) {
+          setStoreLimit(other.getStoreLimit());
+        }
+        if (other.hasStoreOffset()) {
+          setStoreOffset(other.getStoreOffset());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -9580,6 +9792,16 @@ public final class ClientProtos {
               maxResultSize_ = input.readUInt64();
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000400;
+              storeLimit_ = input.readUInt32();
+              break;
+            }
+            case 96: {
+              bitField0_ |= 0x00000800;
+              storeOffset_ = input.readUInt32();
+              break;
+            }
           }
         }
       }
@@ -10270,6 +10492,48 @@ public final class ClientProtos {
         return this;
       }
       
+      // optional uint32 storeLimit = 11;
+      private int storeLimit_ ;
+      public boolean hasStoreLimit() {
+        return ((bitField0_ & 0x00000400) == 0x00000400);
+      }
+      public int getStoreLimit() {
+        return storeLimit_;
+      }
+      public Builder setStoreLimit(int value) {
+        bitField0_ |= 0x00000400;
+        storeLimit_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearStoreLimit() {
+        bitField0_ = (bitField0_ & ~0x00000400);
+        storeLimit_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional uint32 storeOffset = 12;
+      private int storeOffset_ ;
+      public boolean hasStoreOffset() {
+        return ((bitField0_ & 0x00000800) == 0x00000800);
+      }
+      public int getStoreOffset() {
+        return storeOffset_;
+      }
+      public Builder setStoreOffset(int value) {
+        bitField0_ |= 0x00000800;
+        storeOffset_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearStoreOffset() {
+        bitField0_ = (bitField0_ & ~0x00000800);
+        storeOffset_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:Scan)
     }
     
@@ -21385,92 +21649,94 @@ public final class ClientProtos {
   static {
     java.lang.String[] descriptorData = {
       "\n\014Client.proto\032\013hbase.proto\"+\n\006Column\022\016\n" +
-      "\006family\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\320\001\n\003Get" +
+      "\006family\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\371\001\n\003Get" +
       "\022\013\n\003row\030\001 \002(\014\022\027\n\006column\030\002 \003(\0132\007.Column\022!" +
       "\n\tattribute\030\003 \003(\0132\016.NameBytesPair\022\016\n\006loc" +
       "kId\030\004 \001(\004\022\036\n\006filter\030\005 \001(\0132\016.NameBytesPai" +
       "r\022\035\n\ttimeRange\030\006 \001(\0132\n.TimeRange\022\026\n\013maxV" +
       "ersions\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004t" +
-      "rue\"\037\n\006Result\022\025\n\rkeyValueBytes\030\001 \003(\014\"r\n\n" +
-      "GetRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" +
-      "fier\022\021\n\003get\030\002 \002(\0132\004.Get\022\030\n\020closestRowBef",
-      "ore\030\003 \001(\010\022\025\n\rexistenceOnly\030\004 \001(\010\"6\n\013GetR" +
-      "esponse\022\027\n\006result\030\001 \001(\0132\007.Result\022\016\n\006exis" +
-      "ts\030\002 \001(\010\"\200\002\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006f" +
-      "amily\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014\022+\n\013compar" +
-      "eType\030\004 \002(\0162\026.Condition.CompareType\022\"\n\nc" +
-      "omparator\030\005 \002(\0132\016.NameBytesPair\"r\n\013Compa" +
-      "reType\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005E" +
-      "QUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQUA" +
-      "L\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006\"\306\004\n\006Mutate\022\013" +
-      "\n\003row\030\001 \002(\014\022&\n\nmutateType\030\002 \002(\0162\022.Mutate",
-      ".MutateType\022(\n\013columnValue\030\003 \003(\0132\023.Mutat" +
-      "e.ColumnValue\022!\n\tattribute\030\004 \003(\0132\016.NameB" +
-      "ytesPair\022\021\n\ttimestamp\030\005 \001(\004\022\016\n\006lockId\030\006 " +
-      "\001(\004\022\030\n\nwriteToWAL\030\007 \001(\010:\004true\022\035\n\ttimeRan" +
-      "ge\030\n \001(\0132\n.TimeRange\032\310\001\n\013ColumnValue\022\016\n\006" +
-      "family\030\001 \002(\014\022:\n\016qualifierValue\030\002 \003(\0132\".M" +
-      "utate.ColumnValue.QualifierValue\032m\n\016Qual" +
-      "ifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005value\030\002" +
-      " \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022&\n\ndeleteType\030\004 " +
-      "\001(\0162\022.Mutate.DeleteType\"<\n\nMutateType\022\n\n",
-      "\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DEL" +
-      "ETE\020\003\"U\n\nDeleteType\022\026\n\022DELETE_ONE_VERSIO" +
-      "N\020\000\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDEL" +
-      "ETE_FAMILY\020\002\"i\n\rMutateRequest\022 \n\006region\030" +
-      "\001 \002(\0132\020.RegionSpecifier\022\027\n\006mutate\030\002 \002(\0132" +
-      "\007.Mutate\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"" +
-      "<\n\016MutateResponse\022\027\n\006result\030\001 \001(\0132\007.Resu" +
-      "lt\022\021\n\tprocessed\030\002 \001(\010\"\201\002\n\004Scan\022\027\n\006column" +
-      "\030\001 \003(\0132\007.Column\022!\n\tattribute\030\002 \003(\0132\016.Nam" +
-      "eBytesPair\022\020\n\010startRow\030\003 \001(\014\022\017\n\007stopRow\030",
-      "\004 \001(\014\022\036\n\006filter\030\005 \001(\0132\016.NameBytesPair\022\035\n" +
-      "\ttimeRange\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVersi" +
-      "ons\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true\022" +
-      "\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001(" +
-      "\004\"\203\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regi" +
-      "onSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscan" +
-      "nerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014clo" +
-      "seScanner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006resul" +
-      "t\030\001 \003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n\013m" +
-      "oreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockRow",
-      "Request\022 \n\006region\030\001 \002(\0132\020.RegionSpecifie" +
-      "r\022\013\n\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006loc" +
-      "kId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowReque" +
-      "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006" +
-      "lockId\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024Bu" +
-      "lkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.Reg" +
-      "ionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkL" +
-      "oadHFileRequest.FamilyPath\032*\n\nFamilyPath" +
-      "\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLo" +
-      "adHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec",
-      "\022\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nm" +
-      "ethodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.Name" +
-      "StringPair\022!\n\tparameter\030\005 \003(\0132\016.NameByte" +
-      "sPair\"O\n\026ExecCoprocessorRequest\022 \n\006regio" +
-      "n\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132" +
-      "\005.Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005val" +
-      "ue\030\001 \002(\0132\016.NameBytesPair\"N\n\013MultiAction\022" +
-      "\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004." +
-      "Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult" +
-      "\022\035\n\005value\030\001 \001(\0132\016.NameBytesPair\022!\n\texcep",
-      "tion\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiReque" +
-      "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006" +
-      "action\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001" +
-      "(\010\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Ac" +
-      "tionResult2\221\003\n\rClientService\022 \n\003get\022\013.Ge" +
-      "tRequest\032\014.GetResponse\022)\n\006mutate\022\016.Mutat" +
-      "eRequest\032\017.MutateResponse\022#\n\004scan\022\014.Scan" +
-      "Request\032\r.ScanResponse\022,\n\007lockRow\022\017.Lock" +
-      "RowRequest\032\020.LockRowResponse\0222\n\tunlockRo" +
-      "w\022\021.UnlockRowRequest\032\022.UnlockRowResponse",
-      "\022>\n\rbulkLoadHFile\022\025.BulkLoadHFileRequest" +
-      "\032\026.BulkLoadHFileResponse\022D\n\017execCoproces" +
-      "sor\022\027.ExecCoprocessorRequest\032\030.ExecCopro" +
-      "cessorResponse\022&\n\005multi\022\r.MultiRequest\032\016" +
-      ".MultiResponseBB\n*org.apache.hadoop.hbas" +
-      "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" +
-      "\001\001"
+      "rue\022\022\n\nstoreLimit\030\t \001(\r\022\023\n\013storeOffset\030\n" +
+      " \001(\r\"\037\n\006Result\022\025\n\rkeyValueBytes\030\001 \003(\014\"r\n" +
+      "\nGetRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec",
+      "ifier\022\021\n\003get\030\002 \002(\0132\004.Get\022\030\n\020closestRowBe" +
+      "fore\030\003 \001(\010\022\025\n\rexistenceOnly\030\004 \001(\010\"6\n\013Get" +
+      "Response\022\027\n\006result\030\001 \001(\0132\007.Result\022\016\n\006exi" +
+      "sts\030\002 \001(\010\"\200\002\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006" +
+      "family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014\022+\n\013compa" +
+      "reType\030\004 \002(\0162\026.Condition.CompareType\022\"\n\n" +
+      "comparator\030\005 \002(\0132\016.NameBytesPair\"r\n\013Comp" +
+      "areType\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005" +
+      "EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQU" +
+      "AL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006\"\306\004\n\006Mutate\022",
+      "\013\n\003row\030\001 \002(\014\022&\n\nmutateType\030\002 \002(\0162\022.Mutat" +
+      "e.MutateType\022(\n\013columnValue\030\003 \003(\0132\023.Muta" +
+      "te.ColumnValue\022!\n\tattribute\030\004 \003(\0132\016.Name" +
+      "BytesPair\022\021\n\ttimestamp\030\005 \001(\004\022\016\n\006lockId\030\006" +
+      " \001(\004\022\030\n\nwriteToWAL\030\007 \001(\010:\004true\022\035\n\ttimeRa" +
+      "nge\030\n \001(\0132\n.TimeRange\032\310\001\n\013ColumnValue\022\016\n" +
+      "\006family\030\001 \002(\014\022:\n\016qualifierValue\030\002 \003(\0132\"." +
+      "Mutate.ColumnValue.QualifierValue\032m\n\016Qua" +
+      "lifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005value\030" +
+      "\002 \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022&\n\ndeleteType\030\004",
+      " \001(\0162\022.Mutate.DeleteType\"<\n\nMutateType\022\n" +
+      "\n\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DE" +
+      "LETE\020\003\"U\n\nDeleteType\022\026\n\022DELETE_ONE_VERSI" +
+      "ON\020\000\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDE" +
+      "LETE_FAMILY\020\002\"i\n\rMutateRequest\022 \n\006region" +
+      "\030\001 \002(\0132\020.RegionSpecifier\022\027\n\006mutate\030\002 \002(\013" +
+      "2\007.Mutate\022\035\n\tcondition\030\003 \001(\0132\n.Condition" +
+      "\"<\n\016MutateResponse\022\027\n\006result\030\001 \001(\0132\007.Res" +
+      "ult\022\021\n\tprocessed\030\002 \001(\010\"\252\002\n\004Scan\022\027\n\006colum" +
+      "n\030\001 \003(\0132\007.Column\022!\n\tattribute\030\002 \003(\0132\016.Na",
+      "meBytesPair\022\020\n\010startRow\030\003 \001(\014\022\017\n\007stopRow" +
+      "\030\004 \001(\014\022\036\n\006filter\030\005 \001(\0132\016.NameBytesPair\022\035" +
+      "\n\ttimeRange\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVers" +
+      "ions\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true" +
+      "\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001" +
+      "(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 " +
+      "\001(\r\"\203\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Re" +
+      "gionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tsc" +
+      "annerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014c" +
+      "loseScanner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006res",
+      "ult\030\001 \003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n" +
+      "\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockR" +
+      "owRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" +
+      "ier\022\013\n\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006l" +
+      "ockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowReq" +
+      "uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016" +
+      "\n\006lockId\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024" +
+      "BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.R" +
+      "egionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .Bul" +
+      "kLoadHFileRequest.FamilyPath\032*\n\nFamilyPa",
+      "th\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025Bulk" +
+      "LoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Ex" +
+      "ec\022\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n" +
+      "\nmethodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.Na" +
+      "meStringPair\022!\n\tparameter\030\005 \003(\0132\016.NameBy" +
+      "tesPair\"O\n\026ExecCoprocessorRequest\022 \n\006reg" +
+      "ion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(" +
+      "\0132\005.Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005v" +
+      "alue\030\001 \002(\0132\016.NameBytesPair\"N\n\013MultiActio" +
+      "n\022\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132",
+      "\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResu" +
+      "lt\022\035\n\005value\030\001 \001(\0132\016.NameBytesPair\022!\n\texc" +
+      "eption\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiReq" +
+      "uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034" +
+      "\n\006action\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003" +
+      " \001(\010\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r." +
+      "ActionResult2\221\003\n\rClientService\022 \n\003get\022\013." +
+      "GetRequest\032\014.GetResponse\022)\n\006mutate\022\016.Mut" +
+      "ateRequest\032\017.MutateResponse\022#\n\004scan\022\014.Sc" +
+      "anRequest\032\r.ScanResponse\022,\n\007lockRow\022\017.Lo",
+      "ckRowRequest\032\020.LockRowResponse\0222\n\tunlock" +
+      "Row\022\021.UnlockRowRequest\032\022.UnlockRowRespon" +
+      "se\022>\n\rbulkLoadHFile\022\025.BulkLoadHFileReque" +
+      "st\032\026.BulkLoadHFileResponse\022D\n\017execCoproc" +
+      "essor\022\027.ExecCoprocessorRequest\032\030.ExecCop" +
+      "rocessorResponse\022&\n\005multi\022\r.MultiRequest" +
+      "\032\016.MultiResponseBB\n*org.apache.hadoop.hb" +
+      "ase.protobuf.generatedB\014ClientProtosH\001\210\001" +
+      "\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21490,7 +21756,7 @@ public final class ClientProtos {
           internal_static_Get_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Get_descriptor,
-              new java.lang.String[] { "Row", "Column", "Attribute", "LockId", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", },
+              new java.lang.String[] { "Row", "Column", "Attribute", "LockId", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "StoreLimit", "StoreOffset", },
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get.class,
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get.Builder.class);
           internal_static_Result_descriptor =
@@ -21570,7 +21836,7 @@ public final class ClientProtos {
           internal_static_Scan_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Scan_descriptor,
-              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", },
+              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", },
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
           internal_static_ScanRequest_descriptor =

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1356995&r1=1356994&r2=1356995&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Jul  3 22:36:20 2012
@@ -51,6 +51,9 @@ class StoreScanner extends NonLazyKeyVal
   private KeyValueHeap heap;
   private boolean cacheBlocks;
 
+  private int countPerRow = 0;
+  private int storeLimit = -1;
+  private int storeOffset = 0;
 
   private String metricNamePrefix;
   // Used to indicate that the scanner has closed (see HBASE-1107)
@@ -133,6 +136,12 @@ class StoreScanner extends NonLazyKeyVal
       }
     }
 
+    // set storeLimit
+    this.storeLimit = scan.getMaxResultsPerColumnFamily();
+
+    // set rowOffset
+    this.storeOffset = scan.getRowOffsetPerColumnFamily();
+
     // Combine all seeked scanners with a heap
     heap = new KeyValueHeap(scanners, store.comparator);
 
@@ -342,6 +351,7 @@ class StoreScanner extends NonLazyKeyVal
     // only call setRow if the row changes; avoids confusing the query matcher
     // if scanning intra-row
     if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
+      this.countPerRow = 0;
       matcher.setRow(peeked.getRow());
     }
 
@@ -371,11 +381,27 @@ class StoreScanner extends NonLazyKeyVal
           if (f != null) {
             kv = f.transform(kv);
           }
-          results.add(kv);
 
-          if (metric != null) {
-            RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric,
+          this.countPerRow++;
+          if (storeLimit > -1 &&
+              this.countPerRow > (storeLimit + storeOffset)) {
+            // do what SEEK_NEXT_ROW does.
+            if (!matcher.moreRowsMayExistAfter(kv)) {
+              outResult.addAll(results);
+              return false;
+            }
+            reseek(matcher.getKeyForNextRow(kv));
+            break LOOP;
+          }
+
+          // add to results only if we have skipped #storeOffset kvs
+          // also update metric accordingly
+          if (this.countPerRow > storeOffset) {
+            if (metric != null) {
+              RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric,
                 kv.getLength());
+            }
+            results.add(kv);
           }
 
           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
@@ -533,6 +559,7 @@ class StoreScanner extends NonLazyKeyVal
       kv = lastTopKey;
     }
     if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
+      this.countPerRow = 0;
       matcher.reset();
       matcher.setRow(kv.getRow());
     }

Modified: hbase/trunk/hbase-server/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Client.proto?rev=1356995&r1=1356994&r2=1356995&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/Client.proto Tue Jul  3 22:36:20 2012
@@ -46,6 +46,8 @@ message Get {
   optional TimeRange timeRange = 6;
   optional uint32 maxVersions = 7 [default = 1];
   optional bool cacheBlocks = 8 [default = true];
+  optional uint32 storeLimit = 9;
+  optional uint32 storeOffset = 10;
 }
 
 /**
@@ -195,6 +197,8 @@ message Scan {
   optional bool cacheBlocks = 8 [default = true];
   optional uint32 batchSize = 9;
   optional uint64 maxResultSize = 10;
+  optional uint32 storeLimit = 11;
+  optional uint32 storeOffset = 12;
 }
 
 /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java?rev=1356995&r1=1356994&r2=1356995&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HTestConst.java Tue Jul  3 22:36:20 2012
@@ -23,7 +23,10 @@ import java.util.Collections;
 
 import org.apache.hadoop.hbase.util.Bytes;
 
-/** Similar to {@link HConstants} but for tests. */
+/**
+ * Similar to {@link HConstants} but for tests. Also provides some simple
+ * static utility functions to generate test data.
+ */
 public class HTestConst {
 
   private HTestConst() {
@@ -39,4 +42,26 @@ public class HTestConst {
       Collections.unmodifiableSet(new HashSet<String>(
           Arrays.asList(new String[] { DEFAULT_CF_STR })));
 
+  public static final String DEFAULT_ROW_STR = "MyTestRow";
+  public static final byte[] DEFAULT_ROW_BYTES = Bytes.toBytes(DEFAULT_ROW_STR);
+
+  public static final String DEFAULT_QUALIFIER_STR = "MyColumnQualifier";
+  public static final byte[] DEFAULT_QUALIFIER_BYTES = Bytes.toBytes(DEFAULT_QUALIFIER_STR);
+
+  public static String DEFAULT_VALUE_STR = "MyTestValue";
+  public static byte[] DEFAULT_VALUE_BYTES = Bytes.toBytes(DEFAULT_VALUE_STR);
+
+  /**
+   * Generate the given number of unique byte sequences by appending numeric
+   * suffixes (ASCII representations of decimal numbers).
+   */
+  public static byte[][] makeNAscii(byte[] base, int n) {
+    byte [][] ret = new byte[n][];
+    for (int i = 0; i < n; i++) {
+      byte[] tail = Bytes.toBytes(Integer.toString(i));
+      ret[i] = Bytes.add(base, tail);
+    }
+    return ret;
+  }
+
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java?rev=1356995&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java Tue Jul  3 22:36:20 2012
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HTestConst;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test scan/get offset and limit settings within one row through HRegion API.
+ */
+@Category(SmallTests.class)
+public class TestIntraRowPagination {
+
+  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  /**
+   * Test from client side for scan with maxResultPerCF set
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScanLimitAndOffset() throws Exception {
+    //byte [] TABLE = HTestConst.DEFAULT_TABLE_BYTES;
+    byte [][] ROWS = HTestConst.makeNAscii(HTestConst.DEFAULT_ROW_BYTES, 2);
+    byte [][] FAMILIES = HTestConst.makeNAscii(HTestConst.DEFAULT_CF_BYTES, 3);
+    byte [][] QUALIFIERS = HTestConst.makeNAscii(HTestConst.DEFAULT_QUALIFIER_BYTES, 10);
+
+    HTableDescriptor htd = new HTableDescriptor(HTestConst.DEFAULT_TABLE_BYTES);
+    HRegionInfo info = new HRegionInfo(HTestConst.DEFAULT_TABLE_BYTES, null, null, false);
+    for (byte[] family : FAMILIES) {
+      HColumnDescriptor hcd = new HColumnDescriptor(family);
+      htd.addFamily(hcd);
+    }
+    HRegion region =
+        HRegion.createHRegion(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd);
+    try {
+      Put put;
+      Scan scan;
+      Result result;
+      boolean toLog = true;
+
+      List<KeyValue> kvListExp = new ArrayList<KeyValue>();
+
+      int storeOffset = 1;
+      int storeLimit = 3;
+      for (int r = 0; r < ROWS.length; r++) {
+        put = new Put(ROWS[r]);
+        for (int c = 0; c < FAMILIES.length; c++) {
+          for (int q = 0; q < QUALIFIERS.length; q++) {
+            KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1,
+                HTestConst.DEFAULT_VALUE_BYTES);
+            put.add(kv);
+            if (storeOffset <= q && q < storeOffset + storeLimit) {
+              kvListExp.add(kv);
+            }
+          }
+        }
+        region.put(put);
+      }
+
+      scan = new Scan();
+      scan.setRowOffsetPerColumnFamily(storeOffset);
+      scan.setMaxResultsPerColumnFamily(storeLimit);
+      RegionScanner scanner = region.getScanner(scan);
+      List<KeyValue> kvListScan = new ArrayList<KeyValue>();
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      while (scanner.next(results) || !results.isEmpty()) {
+        kvListScan.addAll(results);
+        results.clear();
+      }
+      result = new Result(kvListScan);
+      TestScannersFromClientSide.verifyResult(result, kvListExp, toLog,
+          "Testing scan with storeOffset and storeLimit");
+    } finally {
+      region.close();
+    }
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+    new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java?rev=1356995&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java Tue Jul  3 22:36:20 2012
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HTestConst;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * A client-side test, mostly testing scanners with various parameters.
+ */
+@Category(MediumTests.class)
+public class TestScannersFromClientSide {
+  private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class);
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte [] ROW = Bytes.toBytes("testRow");
+  private static byte [] FAMILY = Bytes.toBytes("testFamily");
+  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static byte [] VALUE = Bytes.toBytes("testValue");
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * Test from client side for batch of scan
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScanBatch() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testScanBatch");
+    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+
+    Put put;
+    Scan scan;
+    Delete delete;
+    Result result;
+    ResultScanner scanner;
+    boolean toLog = true;
+    List<KeyValue> kvListExp;
+
+    // table: row, family, c0:0, c1:1, ... , c7:7
+    put = new Put(ROW);
+    for (int i=0; i < QUALIFIERS.length; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
+      put.add(kv);
+    }
+    ht.put(put);
+
+    // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
+    put = new Put(ROW);
+    KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
+    put.add(kv);
+    ht.put(put);
+
+    // delete upto ts: 3
+    delete = new Delete(ROW);
+    delete.deleteFamily(FAMILY, 3);
+    ht.delete(delete);
+
+    // without batch
+    scan = new Scan(ROW);
+    scan.setMaxVersions();
+    scanner = ht.getScanner(scan);
+
+    // c4:4, c5:5, c6:6, c7:7
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
+    result = scanner.next();
+    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
+
+    // with batch
+    scan = new Scan(ROW);
+    scan.setMaxVersions();
+    scan.setBatch(2);
+    scanner = ht.getScanner(scan);
+
+    // First batch: c4:4, c5:5
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
+    result = scanner.next();
+    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
+
+    // Second batch: c6:6, c7:7
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
+    result = scanner.next();
+    verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
+
+  }
+
+  /**
+   * Test from client side for get with maxResultPerCF set
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testGetMaxResults() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testGetMaxResults");
+    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
+    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+    Get get;
+    Put put;
+    Result result;
+    boolean toLog = true;
+    List<KeyValue> kvListExp;
+
+    kvListExp = new ArrayList<KeyValue>();
+    // Insert one CF for row[0]
+    put = new Put(ROW);
+    for (int i=0; i < 10; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+      kvListExp.add(kv);
+    }
+    ht.put(put);
+
+    get = new Get(ROW);
+    result = ht.get(get);
+    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
+
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(2);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
+    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
+
+    // Filters: ColumnRangeFilter
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(5);
+    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
+                                        true));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
+    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
+
+    // Insert two more CF for row[0]
+    // 20 columns for CF2, 10 columns for CF1
+    put = new Put(ROW);
+    for (int i=0; i < QUALIFIERS.length; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+    }
+    ht.put(put);
+
+    put = new Put(ROW);
+    for (int i=0; i < 10; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+    }
+    ht.put(put);
+
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(12);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
+    for (int i=0; i < 10; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
+    }
+    for (int i=0; i < 2; i++) {
+        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+      }
+    for (int i=10; i < 20; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
+
+    // Filters: ColumnRangeFilter and ColumnPrefixFilter
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(3);
+    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    for (int i=2; i < 5; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
+    }
+    for (int i=2; i < 5; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
+    }
+    for (int i=2; i < 5; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
+
+    get = new Get(ROW);
+    get.setMaxResultsPerColumnFamily(7);
+    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
+    for (int i=10; i < 16; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
+
+  }
+
+  /**
+   * Test from client side for scan with maxResultPerCF set
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScanMaxResults() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testScanLimit");
+    byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
+    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
+    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+    Put put;
+    Scan scan;
+    Result result;
+    boolean toLog = true;
+    List<KeyValue> kvListExp, kvListScan;
+
+    kvListExp = new ArrayList<KeyValue>();
+
+    for (int r=0; r < ROWS.length; r++) {
+      put = new Put(ROWS[r]);
+      for (int c=0; c < FAMILIES.length; c++) {
+        for (int q=0; q < QUALIFIERS.length; q++) {
+          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
+          put.add(kv);
+          if (q < 4) {
+            kvListExp.add(kv);
+          }
+        }
+      }
+      ht.put(put);
+    }
+
+    scan = new Scan();
+    scan.setMaxResultsPerColumnFamily(4);
+    ResultScanner scanner = ht.getScanner(scan);
+    kvListScan = new ArrayList<KeyValue>();
+    while ((result = scanner.next()) != null) {
+      for (KeyValue kv : result.list()) {
+        kvListScan.add(kv);
+      }
+    }
+    result = new Result(kvListScan);
+    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
+
+  }
+
+  /**
+   * Test from client side for get with rowOffset
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testGetRowOffset() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testGetRowOffset");
+    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
+    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+
+    Get get;
+    Put put;
+    Result result;
+    boolean toLog = true;
+    List<KeyValue> kvListExp;
+
+    // Insert one CF for row
+    kvListExp = new ArrayList<KeyValue>();
+    put = new Put(ROW);
+    for (int i=0; i < 10; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
+      put.add(kv);
+      // skipping first two kvs
+      if (i < 2) continue;
+      kvListExp.add(kv);
+    }
+    ht.put(put);
+
+    //setting offset to 2
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(2);
+    result = ht.get(get);
+    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
+
+    //setting offset to 20
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(20);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
+
+    //offset + maxResultPerCF
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(4);
+    get.setMaxResultsPerColumnFamily(5);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    for (int i=4; i < 9; i++) {
+      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
+    }
+    verifyResult(result, kvListExp, toLog,
+      "Testing offset + setMaxResultsPerCF");
+
+    // Filters: ColumnRangeFilter
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(1);
+    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
+                                        true));
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
+    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
+
+    // Insert into two more CFs for row
+    // 10 columns for CF2, 10 columns for CF1
+    for(int j=2; j > 0; j--) {
+      put = new Put(ROW);
+      for (int i=0; i < 10; i++) {
+        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
+        put.add(kv);
+      }
+      ht.put(put);
+    }
+
+    get = new Get(ROW);
+    get.setRowOffsetPerColumnFamily(4);
+    get.setMaxResultsPerColumnFamily(2);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    result = ht.get(get);
+    kvListExp = new ArrayList<KeyValue>();
+    //Exp: CF1:q4, q5, CF2: q4, q5
+    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
+    verifyResult(result, kvListExp, toLog,
+       "Testing offset + multiple CFs + maxResults");
+
+  }
+
+  static void verifyResult(Result result, List<KeyValue> expKvList, boolean toLog,
+      String msg) {
+
+    LOG.info(msg);
+    LOG.info("Expected count: " + expKvList.size());
+    LOG.info("Actual count: " + result.size());
+    if (expKvList.size() == 0)
+      return;
+
+    int i = 0;
+    for (KeyValue kv : result.raw()) {
+      if (i >= expKvList.size()) {
+        break;  // we will check the size later
+      }
+
+      KeyValue kvExp = expKvList.get(i++);
+      if (toLog) {
+        LOG.info("get kv is: " + kv.toString());
+        LOG.info("exp kv is: " + kvExp.toString());
+      }
+      assertTrue("Not equal", kvExp.equals(kv));
+    }
+
+    assertEquals(expKvList.size(), result.size());
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+    new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+
+}