You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/09 22:37:36 UTC

svn commit: r1431103 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/cl...

Author: tedyu
Date: Wed Jan  9 21:37:35 2013
New Revision: 1431103

URL: http://svn.apache.org/viewvc?rev=1431103&view=rev
Log:
HBASE-5416 Improve performance of scans with some kind of filters (Max Lapan and Sergey)


Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Wed Jan  9 21:37:35 2013
@@ -2049,6 +2049,19 @@ public class KeyValue implements Cell, H
   }
 
   /**
+   * Create a KeyValue that is smaller than all other possible KeyValues
+   * for the given row. That is any (valid) KeyValue on 'row' would sort
+   * _after_ the result.
+   *
+   * @param row - row key (arbitrary byte array)
+   * @return First possible KeyValue on passed <code>row</code>
+   */
+  public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
+    return new KeyValue(row, roffset, rlength,
+        null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
+  }
+
+  /**
    * Creates a KeyValue that is smaller than all other KeyValues that
    * are older than the passed timestamp.
    * @param row - row key (arbitrary byte array)

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java Wed Jan  9 21:37:35 2013
@@ -8992,6 +8992,10 @@ public final class ClientProtos {
     // optional uint32 storeOffset = 12;
     boolean hasStoreOffset();
     int getStoreOffset();
+    
+    // optional bool loadColumnFamiliesOnDemand = 13;
+    boolean hasLoadColumnFamiliesOnDemand();
+    boolean getLoadColumnFamiliesOnDemand();
   }
   public static final class Scan extends
       com.google.protobuf.GeneratedMessage
@@ -9170,6 +9174,16 @@ public final class ClientProtos {
       return storeOffset_;
     }
     
+    // optional bool loadColumnFamiliesOnDemand = 13;
+    public static final int LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER = 13;
+    private boolean loadColumnFamiliesOnDemand_;
+    public boolean hasLoadColumnFamiliesOnDemand() {
+      return ((bitField0_ & 0x00000400) == 0x00000400);
+    }
+    public boolean getLoadColumnFamiliesOnDemand() {
+      return loadColumnFamiliesOnDemand_;
+    }
+    
     private void initFields() {
       column_ = java.util.Collections.emptyList();
       attribute_ = java.util.Collections.emptyList();
@@ -9183,6 +9197,7 @@ public final class ClientProtos {
       maxResultSize_ = 0L;
       storeLimit_ = 0;
       storeOffset_ = 0;
+      loadColumnFamiliesOnDemand_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -9250,6 +9265,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000200) == 0x00000200)) {
         output.writeUInt32(12, storeOffset_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        output.writeBool(13, loadColumnFamiliesOnDemand_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -9307,6 +9325,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt32Size(12, storeOffset_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(13, loadColumnFamiliesOnDemand_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -9384,6 +9406,11 @@ public final class ClientProtos {
         result = result && (getStoreOffset()
             == other.getStoreOffset());
       }
+      result = result && (hasLoadColumnFamiliesOnDemand() == other.hasLoadColumnFamiliesOnDemand());
+      if (hasLoadColumnFamiliesOnDemand()) {
+        result = result && (getLoadColumnFamiliesOnDemand()
+            == other.getLoadColumnFamiliesOnDemand());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -9441,6 +9468,10 @@ public final class ClientProtos {
         hash = (37 * hash) + STOREOFFSET_FIELD_NUMBER;
         hash = (53 * hash) + getStoreOffset();
       }
+      if (hasLoadColumnFamiliesOnDemand()) {
+        hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -9601,6 +9632,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000400);
         storeOffset_ = 0;
         bitField0_ = (bitField0_ & ~0x00000800);
+        loadColumnFamiliesOnDemand_ = false;
+        bitField0_ = (bitField0_ & ~0x00001000);
         return this;
       }
       
@@ -9705,6 +9738,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000200;
         }
         result.storeOffset_ = storeOffset_;
+        if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+          to_bitField0_ |= 0x00000400;
+        }
+        result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -9803,6 +9840,9 @@ public final class ClientProtos {
         if (other.hasStoreOffset()) {
           setStoreOffset(other.getStoreOffset());
         }
+        if (other.hasLoadColumnFamiliesOnDemand()) {
+          setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -9922,6 +9962,11 @@ public final class ClientProtos {
               storeOffset_ = input.readUInt32();
               break;
             }
+            case 104: {
+              bitField0_ |= 0x00001000;
+              loadColumnFamiliesOnDemand_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -10654,6 +10699,27 @@ public final class ClientProtos {
         return this;
       }
       
+      // optional bool loadColumnFamiliesOnDemand = 13;
+      private boolean loadColumnFamiliesOnDemand_ ;
+      public boolean hasLoadColumnFamiliesOnDemand() {
+        return ((bitField0_ & 0x00001000) == 0x00001000);
+      }
+      public boolean getLoadColumnFamiliesOnDemand() {
+        return loadColumnFamiliesOnDemand_;
+      }
+      public Builder setLoadColumnFamiliesOnDemand(boolean value) {
+        bitField0_ |= 0x00001000;
+        loadColumnFamiliesOnDemand_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearLoadColumnFamiliesOnDemand() {
+        bitField0_ = (bitField0_ & ~0x00001000);
+        loadColumnFamiliesOnDemand_ = false;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Scan)
     }
     
@@ -21446,7 +21512,7 @@ public final class ClientProtos {
       "\006region\030\001 \002(\0132\020.RegionSpecifier\022\027\n\006mutat" +
       "e\030\002 \002(\0132\007.Mutate\022\035\n\tcondition\030\003 \001(\0132\n.Co" +
       "ndition\"<\n\016MutateResponse\022\027\n\006result\030\001 \001(" +
-      "\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\243\002\n\004Scan\022\027" +
+      "\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\307\002\n\004Scan\022\027" +
       "\n\006column\030\001 \003(\0132\007.Column\022!\n\tattribute\030\002 \003" +
       "(\0132\016.NameBytesPair\022\020\n\010startRow\030\003 \001(\014\022\017\n\007" +
       "stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007.Filter\022\035" +
@@ -21454,50 +21520,50 @@ public final class ClientProtos {
       "ions\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true" +
       "\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001" +
       "(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 " +
-      "\001(\r\"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Re" +
-      "gionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tsc" +
-      "annerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014c" +
-      "loseScanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\" +
-      "\n\014ScanResponse\022\027\n\006result\030\001 \003(\0132\007.Result\022" +
-      "\021\n\tscannerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022" +
-      "\013\n\003ttl\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region",
-      "\030\001 \002(\0132\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n" +
-      "\017LockRowResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030" +
-      "\002 \001(\r\"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(" +
-      "\0132\020.RegionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021U" +
-      "nlockRowResponse\"\260\001\n\024BulkLoadHFileReques" +
-      "t\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\0224\n\nf" +
-      "amilyPath\030\002 \003(\0132 .BulkLoadHFileRequest.F" +
-      "amilyPath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamil" +
-      "yPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025B" +
-      "ulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"_\n\026",
-      "CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013s" +
-      "erviceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007" +
-      "request\030\004 \002(\014\"d\n\031CoprocessorServiceReque" +
-      "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004" +
-      "call\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032C" +
-      "oprocessorServiceResponse\022 \n\006region\030\001 \002(" +
-      "\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.Nam" +
-      "eBytesPair\"9\n\013MultiAction\022\027\n\006mutate\030\001 \001(" +
-      "\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionR" +
-      "esult\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texcepti",
-      "on\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiRequest" +
-      "\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006ac" +
-      "tion\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010" +
-      "\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Acti" +
-      "onResult2\223\003\n\rClientService\022 \n\003get\022\013.GetR" +
-      "equest\032\014.GetResponse\022)\n\006mutate\022\016.MutateR" +
-      "equest\032\017.MutateResponse\022#\n\004scan\022\014.ScanRe" +
-      "quest\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRo" +
-      "wRequest\032\020.LockRowResponse\0222\n\tunlockRow\022" +
-      "\021.UnlockRowRequest\032\022.UnlockRowResponse\022>",
-      "\n\rbulkLoadHFile\022\025.BulkLoadHFileRequest\032\026" +
-      ".BulkLoadHFileResponse\022F\n\013execService\022\032." +
-      "CoprocessorServiceRequest\032\033.CoprocessorS" +
-      "erviceResponse\022&\n\005multi\022\r.MultiRequest\032\016" +
-      ".MultiResponseBB\n*org.apache.hadoop.hbas" +
-      "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" +
-      "\001\001"
+      "\001(\r\022\"\n\032loadColumnFamiliesOnDemand\030\r \001(\010\"" +
+      "\230\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Region" +
+      "Specifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscanne" +
+      "rId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014close" +
+      "Scanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\\n\014Sc" +
+      "anResponse\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\ts" +
+      "cannerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003t",
+      "tl\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002" +
+      "(\0132\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017Loc" +
+      "kRowResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(" +
+      "\r\"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020." +
+      "RegionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021Unloc" +
+      "kRowResponse\"\260\001\n\024BulkLoadHFileRequest\022 \n" +
+      "\006region\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamil" +
+      "yPath\030\002 \003(\0132 .BulkLoadHFileRequest.Famil" +
+      "yPath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamilyPat" +
+      "h\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkL",
+      "oadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"_\n\026Copr" +
+      "ocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013servi" +
+      "ceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007requ" +
+      "est\030\004 \002(\014\"d\n\031CoprocessorServiceRequest\022 " +
+      "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call" +
+      "\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032Copro" +
+      "cessorServiceResponse\022 \n\006region\030\001 \002(\0132\020." +
+      "RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameByt" +
+      "esPair\"9\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007." +
+      "Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResul",
+      "t\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texception\030\002" +
+      " \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006" +
+      "region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action" +
+      "\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\r" +
+      "MultiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionRe" +
+      "sult2\223\003\n\rClientService\022 \n\003get\022\013.GetReque" +
+      "st\032\014.GetResponse\022)\n\006mutate\022\016.MutateReque" +
+      "st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" +
+      "t\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRowReq" +
+      "uest\032\020.LockRowResponse\0222\n\tunlockRow\022\021.Un",
+      "lockRowRequest\032\022.UnlockRowResponse\022>\n\rbu" +
+      "lkLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bul" +
+      "kLoadHFileResponse\022F\n\013execService\022\032.Copr" +
+      "ocessorServiceRequest\032\033.CoprocessorServi" +
+      "ceResponse\022&\n\005multi\022\r.MultiRequest\032\016.Mul" +
+      "tiResponseBB\n*org.apache.hadoop.hbase.pr" +
+      "otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21597,7 +21663,7 @@ public final class ClientProtos {
           internal_static_Scan_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Scan_descriptor,
-              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", },
+              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", },
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
           internal_static_ScanRequest_descriptor =

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto Wed Jan  9 21:37:35 2013
@@ -186,6 +186,7 @@ message Scan {
   optional uint64 maxResultSize = 10;
   optional uint32 storeLimit = 11;
   optional uint32 storeOffset = 12;
+  optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */
 }
 
 /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java Wed Jan  9 21:37:35 2013
@@ -92,7 +92,7 @@ public class Scan extends OperationWithA
 
   private int storeLimit = -1;
   private int storeOffset = 0;
-  
+
   // If application wants to collect scan metrics, it needs to
   // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
   static public final String SCAN_ATTRIBUTES_METRICS_ENABLE =
@@ -110,6 +110,7 @@ public class Scan extends OperationWithA
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+  private Boolean loadColumnFamiliesOnDemand = null;
 
   /**
    * Create a Scan operation across all rows.
@@ -159,6 +160,7 @@ public class Scan extends OperationWithA
     maxResultSize = scan.getMaxResultSize();
     cacheBlocks = scan.getCacheBlocks();
     filter = scan.getFilter(); // clone?
+    loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
     TimeRange ctr = scan.getTimeRange();
     tr = new TimeRange(ctr.getMin(), ctr.getMax());
     Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
@@ -519,6 +521,41 @@ public class Scan extends OperationWithA
   }
 
   /**
+   * Set the value indicating whether loading CFs on demand should be allowed (cluster
+   * default is false). On-demand CF loading doesn't load column families until necessary, e.g.
+   * if you filter on one column, the other column family data will be loaded only for the rows
+   * that are included in result, not all rows like in normal case.
+   * With column-specific filters, like SingleColumnValueFilter w/filterIfMissing == true,
+   * this can deliver huge perf gains when there's a cf with lots of data; however, it can
+   * also lead to some inconsistent results, as follows:
+   * - if someone does a concurrent update to both column families in question you may get a row
+   *   that never existed, e.g. for { rowKey = 5, { cat_videos => 1 }, { video => "my cat" } }
+   *   someone puts rowKey 5 with { cat_videos => 0 }, { video => "my dog" }, concurrent scan
+   *   filtering on "cat_videos == 1" can get { rowKey = 5, { cat_videos => 1 },
+   *   { video => "my dog" } }.
+   * - if there's a concurrent split and you have more than 2 column families, some rows may be
+   *   missing some column families.
+   */
+  public void setLoadColumnFamiliesOnDemand(boolean value) {
+    this.loadColumnFamiliesOnDemand = value;
+  }
+
+  /**
+   * Get the raw loadColumnFamiliesOnDemand setting; if it's not set, can be null.
+   */
+  public Boolean getLoadColumnFamiliesOnDemandValue() {
+    return this.loadColumnFamiliesOnDemand;
+  }
+
+  /**
+   * Get the logical value indicating whether on-demand CF loading should be allowed.
+   */
+  public boolean doLoadColumnFamiliesOnDemand() {
+    return (this.loadColumnFamiliesOnDemand != null)
+      && this.loadColumnFamiliesOnDemand.booleanValue();
+  }
+
+  /**
    * Compile the table and column family (i.e. schema) information
    * into a String. Useful for parsing and aggregation by debugging,
    * logging, and administration tools.
@@ -547,7 +584,7 @@ public class Scan extends OperationWithA
    * Useful for debugging, logging, and administration tools.
    * @param maxCols a limit on the number of columns output prior to truncation
    * @return Map
-   */ 
+   */
   @Override
   public Map<String, Object> toMap(int maxCols) {
     // start with the fingerpring map and build on top of it
@@ -564,6 +601,7 @@ public class Scan extends OperationWithA
     map.put("caching", this.caching);
     map.put("maxResultSize", this.maxResultSize);
     map.put("cacheBlocks", this.cacheBlocks);
+    map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
     List<Long> timeRange = new ArrayList<Long>();
     timeRange.add(this.tr.getMin());
     timeRange.add(this.tr.getMax());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Wed Jan  9 21:37:35 2013
@@ -172,6 +172,14 @@ public abstract class Filter {
   abstract public KeyValue getNextKeyHint(final KeyValue currentKV);
 
   /**
+   * Check that given column family is essential for filter to check row.  Most
+   * filters always return true here. But some could have more sophisticated
+   * logic which could significantly reduce scanning process by not even
+   * touching columns until we are 100% sure that it's data is needed in result.
+   */
+  abstract public boolean isFamilyEssential(byte[] name);
+
+  /**
    * @return The filter serialized using pb
    */
   abstract public byte [] toByteArray();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java Wed Jan  9 21:37:35 2013
@@ -134,6 +134,16 @@ public abstract class FilterBase extends
   }
 
   /**
+   * By default, we require all scan's column families to be present. Our
+   * subclasses may be more precise.
+   *
+   * @inheritDoc
+   */
+  public boolean isFamilyEssential(byte[] name) {
+    return true;
+  }
+
+  /**
    * Given the filter's arguments it constructs the filter
    * <p>
    * @param filterArguments the filter's arguments

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java Wed Jan  9 21:37:35 2013
@@ -361,6 +361,16 @@ public class FilterList extends Filter {
   }
 
   @Override
+  public boolean isFamilyEssential(byte[] name) {
+    for (Filter filter : filters) {
+      if (filter.isFamilyEssential(name)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
   public String toString() {
     return toString(MAX_LOG_FILTERS);
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java Wed Jan  9 21:37:35 2013
@@ -136,6 +136,11 @@ public class FilterWrapper extends Filte
     }
   }
 
+  @Override
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  };
+
   /**
    * @param other
    * @return true if and only if the fields of the filter that are serialized

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java Wed Jan  9 21:37:35 2013
@@ -31,6 +31,8 @@ import com.google.protobuf.InvalidProtoc
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
 
 /**
  * A {@link Filter} that checks a single column value, but does not emit the
@@ -96,16 +98,22 @@ public class SingleColumnValueExcludeFil
       matchedColumn,filterIfMissing,latestVersionOnly);
   }
 
-  public ReturnCode filterKeyValue(KeyValue keyValue) {
-    ReturnCode superRetCode = super.filterKeyValue(keyValue);
-    if (superRetCode == ReturnCode.INCLUDE) {
+  // We cleaned result row in FilterRow to be consistent with scanning process.
+  public boolean hasFilterRow() {
+   return true;
+  }
+
+  // Here we remove from row all key values from testing column
+  public void filterRow(List<KeyValue> kvs) {
+    Iterator it = kvs.iterator();
+    while (it.hasNext()) {
+      KeyValue kv = (KeyValue)it.next();
       // If the current column is actually the tested column,
       // we will skip it instead.
-      if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
-        return ReturnCode.SKIP;
+      if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
+        it.remove();
       }
     }
-    return superRetCode;
   }
 
   public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Wed Jan  9 21:37:35 2013
@@ -379,6 +379,15 @@ public class SingleColumnValueFilter ext
       && this.getLatestVersionOnly() == other.getLatestVersionOnly();
   }
 
+  /**
+   * The only CF this filter needs is given column family. So, it's the only essential
+   * column in whole scan. If filterIfMissing == false, all families are essential,
+   * because of possibility of skipping the rows without any data in filtered CF.
+   */
+  public boolean isFamilyEssential(byte[] name) {
+    return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
+  }
+
   @Override
   public String toString() {
     return String.format("%s (%s, %s, %s, %s)",

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java Wed Jan  9 21:37:35 2013
@@ -138,6 +138,10 @@ public class SkipFilter extends FilterBa
     return getFilter().areSerializedFieldsEqual(other.getFilter());
   }
 
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + " " + this.filter.toString();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java Wed Jan  9 21:37:35 2013
@@ -138,6 +138,10 @@ public class WhileMatchFilter extends Fi
     return getFilter().areSerializedFieldsEqual(other.getFilter());
   }
 
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + " " + this.filter.toString();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Jan  9 21:37:35 2013
@@ -565,6 +565,10 @@ public final class ProtobufUtil {
     if (scan.getMaxResultSize() > 0) {
       scanBuilder.setMaxResultSize(scan.getMaxResultSize());
     }
+    Boolean loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
+    if (loadColumnFamiliesOnDemand != null) {
+      scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
+    }
     scanBuilder.setMaxVersions(scan.getMaxVersions());
     TimeRange timeRange = scan.getTimeRange();
     if (!timeRange.isAllTime()) {
@@ -648,6 +652,9 @@ public final class ProtobufUtil {
     if (proto.hasStoreOffset()) {
       scan.setRowOffsetPerColumnFamily(proto.getStoreOffset());
     }
+    if (proto.hasLoadColumnFamiliesOnDemand()) {
+      scan.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
+    }
     if (proto.hasTimeRange()) {
       HBaseProtos.TimeRange timeRange = proto.getTimeRange();
       long minStamp = 0;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jan  9 21:37:35 2013
@@ -184,6 +184,9 @@ public class HRegion implements HeapSize
   public static final Log LOG = LogFactory.getLog(HRegion.class);
   private static final String MERGEDIR = ".merges";
 
+  public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
+      "hbase.hregion.scan.loadColumnFamiliesOnDemand";
+
   final AtomicBoolean closed = new AtomicBoolean(false);
   /* Closing can take some time; use the closing flag if there is stuff we don't
    * want to do while in closing state; e.g. like offer this region up to the
@@ -281,6 +284,13 @@ public class HRegion implements HeapSize
   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
 
   /**
+   * The default setting for whether to enable on-demand CF loading for
+   * scan requests to this region. Requests can override it.
+   */
+  private boolean isLoadingCfsOnDemandDefault = false;
+
+
+  /**
    * @return The smallest mvcc readPoint across all the scanners in this
    * region. Writes older than this readPoint, are included  in every
    * read operation.
@@ -455,6 +465,8 @@ public class HRegion implements HeapSize
       .add(htd.getValues());
     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
                     DEFAULT_ROWLOCK_WAIT_DURATION);
+
+    this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, false);
     this.regionInfo = regionInfo;
     this.htableDescriptor = htd;
     this.rsServices = rsServices;
@@ -915,6 +927,10 @@ public class HRegion implements HeapSize
      return mvcc;
    }
 
+   public boolean isLoadingCfsOnDemandDefault() {
+     return this.isLoadingCfsOnDemandDefault;
+   }
+
   /**
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
    * service any more calls.
@@ -3383,6 +3399,15 @@ public class HRegion implements HeapSize
   class RegionScannerImpl implements RegionScanner {
     // Package local for testability
     KeyValueHeap storeHeap = null;
+    /** Heap of key-values that are not essential for the provided filters and are thus read
+     * on demand, if on-demand column family loading is enabled.*/
+    KeyValueHeap joinedHeap = null;
+    /**
+     * If the joined heap data gathering is interrupted due to scan limits, this will
+     * contain the row for which we are populating the values.*/
+    private KeyValue joinedContinuationRow = null;
+    // KeyValue indicating that limit is reached when scanning
+    private final KeyValue KV_LIMIT = new KeyValue();
     private final byte [] stopRow;
     private Filter filter;
     private List<KeyValue> results = new ArrayList<KeyValue>();
@@ -3429,7 +3454,10 @@ public class HRegion implements HeapSize
         scannerReadPoints.put(this, this.readPt);
       }
 
+      // Here we separate all scanners into two lists - scanner that provide data required
+      // by the filter to operate (scanners list) and all others (joinedScanners list).
       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+      List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
       if (additionalScanners != null) {
         scanners.addAll(additionalScanners);
       }
@@ -3438,9 +3466,17 @@ public class HRegion implements HeapSize
           scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
         KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
-        scanners.add(scanner);
+        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
+          || this.filter.isFamilyEssential(entry.getKey())) {
+          scanners.add(scanner);
+        } else {
+          joinedScanners.add(scanner);
+        }
       }
       this.storeHeap = new KeyValueHeap(scanners, comparator);
+      if (!joinedScanners.isEmpty()) {
+        this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
+      }
     }
 
     RegionScannerImpl(Scan scan) throws IOException {
@@ -3527,6 +3563,43 @@ public class HRegion implements HeapSize
       return next(outResults, batch, metric);
     }
 
+    private void populateFromJoinedHeap(int limit, String metric) throws IOException {
+      assert joinedContinuationRow != null;
+      KeyValue kv = populateResult(this.joinedHeap, limit, joinedContinuationRow.getBuffer(),
+        joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength(), metric);
+      if (kv != KV_LIMIT) {
+        // We are done with this row, reset the continuation.
+        joinedContinuationRow = null;
+      }
+      // As the data is obtained from two independent heaps, we need to
+      // ensure that result list is sorted, because Result relies on that.
+      Collections.sort(results, comparator);
+    }
+
+    /**
+     * Fetches records with currentRow into results list, until next row or limit (if not -1).
+     * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
+     * @param limit Max amount of KVs to place in result list, -1 means no limit.
+     * @param currentRow Byte array with key we are fetching.
+     * @param offset offset for currentRow
+     * @param length length for currentRow
+     * @param metric Metric key to be passed into KeyValueHeap::next().
+     * @return KV_LIMIT if limit reached, next KeyValue otherwise.
+     */
+    private KeyValue populateResult(KeyValueHeap heap, int limit, byte[] currentRow, int offset,
+        short length, String metric) throws IOException {
+      KeyValue nextKv;
+      do {
+        heap.next(results, limit - results.size(), metric);
+        if (limit > 0 && results.size() == limit) {
+          return KV_LIMIT;
+        }
+        nextKv = heap.peek();
+      } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+
+      return nextKv;
+    }
+
     /*
      * @return True if a filter rules the scanner is over, done.
      */
@@ -3536,6 +3609,11 @@ public class HRegion implements HeapSize
 
     private boolean nextInternal(int limit, String metric) throws IOException {
       RpcCallContext rpcCall = HBaseServer.getCurrentCall();
+      // The loop here is used only when at some point during the next we determine
+      // that due to effects of filters or otherwise, we have an empty row in the result.
+      // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
+      // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
+      // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
       while (true) {
         if (rpcCall != null) {
           // If a user specifies a too-restrictive or too-slow scanner, the
@@ -3545,7 +3623,9 @@ public class HRegion implements HeapSize
           rpcCall.throwExceptionIfCallerDisconnected();
         }
 
+        // Let's see what we have in the storeHeap.
         KeyValue current = this.storeHeap.peek();
+
         byte[] currentRow = null;
         int offset = 0;
         short length = 0;
@@ -3554,38 +3634,47 @@ public class HRegion implements HeapSize
           offset = current.getRowOffset();
           length = current.getRowLength();
         }
-        if (isStopRow(currentRow, offset, length)) {
-          if (filter != null && filter.hasFilterRow()) {
-            filter.filterRow(results);
+        boolean stopRow = isStopRow(currentRow, offset, length);
+        // Check if we were getting data from the joinedHeap and hit the limit.
+        // If not, then it's main path - getting results from storeHeap.
+        if (joinedContinuationRow == null) {
+          // First, check if we are at a stop row. If so, there are no more results.
+          if (stopRow) {
+            if (filter != null && filter.hasFilterRow()) {
+              filter.filterRow(results);        
+            }
+            return false;
           }
 
-          return false;
-        } else if (filterRowKey(currentRow, offset, length)) {
-          nextRow(currentRow, offset, length);
-        } else {
-          KeyValue nextKv;
-          do {
-            this.storeHeap.next(results, limit - results.size(), metric);
-            if (limit > 0 && results.size() == limit) {
-              if (this.filter != null && filter.hasFilterRow()) {
-                throw new IncompatibleFilterException(
-                  "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
-              }
-              return true; // we are expecting more yes, but also limited to how many we can return.
-            }
-            nextKv = this.storeHeap.peek();
-          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+          // Check if rowkey filter wants to exclude this row. If so, loop to next.
+          // Technically, if we hit limits before on this row, we don't need this call.
+          if (filterRowKey(currentRow, offset, length)) {
+            nextRow(currentRow, offset, length);
+            continue;
+          }
 
-          final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+          KeyValue nextKv = populateResult(this.storeHeap, limit, currentRow, offset, length,
+            metric);
+          // Ok, we are good, let's try to get some results from the main heap.
+          if (nextKv == KV_LIMIT) {
+            if (this.filter != null && filter.hasFilterRow()) {
+              throw new IncompatibleFilterException(
+                "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
+            }
+            return true; // We hit the limit.
+          }
 
-          // now that we have an entire row, lets process with a filters:
+          stopRow = nextKv == null ||
+              isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+          // save that the row was empty before filters applied to it.
+          final boolean isEmptyRow = results.isEmpty();
 
-          // first filter with the filterRow(List)
+          // We have the part of the row necessary for filtering (all of it, usually).
+          // First filter with the filterRow(List).            
           if (filter != null && filter.hasFilterRow()) {
             filter.filterRow(results);
           }
-
-          if (results.isEmpty()) {
+          if (isEmptyRow) {
             // this seems like a redundant step - we already consumed the row
             // there're no left overs.
             // the reasons for calling this method are:
@@ -3594,12 +3683,48 @@ public class HRegion implements HeapSize
             nextRow(currentRow, offset, length);
 
             // This row was totally filtered out, if this is NOT the last row,
-            // we should continue on.
-
+            // we should continue on. Otherwise, nothing else to do.
             if (!stopRow) continue;
+            return false;
+          }
+
+          // Ok, we are done with storeHeap for this row.
+          // Now we may need to fetch additional, non-essential data into row.
+          // These values are not needed for filter to work, so we postpone their
+          // fetch to (possibly) reduce amount of data loads from disk.
+          if (this.joinedHeap != null) {
+            KeyValue nextJoinedKv = joinedHeap.peek();
+            // If joinedHeap is pointing to some other row, try to seek to a correct one.
+            // We don't need to recheck that row here - populateResult will take care of that.
+            boolean mayHaveData =
+              (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
+              || this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
+            if (mayHaveData) {
+              joinedContinuationRow = current;
+              populateFromJoinedHeap(limit, metric);
+            }
           }
-          return !stopRow;
+        } else {
+          // Populating from the joined heap was stopped by limits, populate some more.
+          populateFromJoinedHeap(limit, metric);
         }
+
+        // We may have just called populateFromJoinedMap and hit the limits. If that is
+        // the case, we need to call it again on the next next() invocation.
+        if (joinedContinuationRow != null) {
+          return true;
+        }
+
+        // Finally, we are done with both joinedHeap and storeHeap.
+        // Double check to prevent empty rows from appearing in result. It could be
+        // the case when SingleColumnValueExcludeFilter is used.
+        if (results.isEmpty()) {
+          nextRow(currentRow, offset, length);
+          if (!stopRow) continue;
+        }
+
+        // We are done. Return the result.
+        return !stopRow;
       }
     }
 
@@ -3609,8 +3734,9 @@ public class HRegion implements HeapSize
     }
 
     protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
+      assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
       KeyValue next;
-      while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
+      while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
         this.storeHeap.next(MOCKED_LIST);
       }
       results.clear();
@@ -3621,7 +3747,7 @@ public class HRegion implements HeapSize
       return currentRow == null ||
           (stopRow != null &&
           comparator.compareRows(stopRow, 0, stopRow.length,
-              currentRow, offset, length) <= isScan);
+            currentRow, offset, length) <= isScan);
     }
 
     @Override
@@ -3630,6 +3756,10 @@ public class HRegion implements HeapSize
         storeHeap.close();
         storeHeap = null;
       }
+      if (joinedHeap != null) {
+        joinedHeap.close();
+        joinedHeap = null;
+      }
       // no need to sychronize here.
       scannerReadPoints.remove(this);
       this.filterClosed = true;
@@ -3644,16 +3774,21 @@ public class HRegion implements HeapSize
       if (row == null) {
         throw new IllegalArgumentException("Row cannot be null.");
       }
+      boolean result = false;
       startRegionOperation();
       try {
         // This could be a new thread from the last time we called next().
         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
         KeyValue kv = KeyValue.createFirstOnRow(row);
         // use request seek to make use of the lazy seek option. See HBASE-5520
-        return this.storeHeap.requestSeek(kv, true, true);
+        result = this.storeHeap.requestSeek(kv, true, true);
+        if (this.joinedHeap != null) {
+          result = this.joinedHeap.requestSeek(kv, true, true) || result;
+        }
       } finally {
         closeRegionOperation();
       }
+      return result;
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jan  9 21:37:35 2013
@@ -2898,7 +2898,12 @@ public class  HRegionServer implements C
         } else {
           region = getRegion(request.getRegion());
           ClientProtos.Scan protoScan = request.getScan();
+          boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
           Scan scan = ProtobufUtil.toScan(protoScan);
+          // if the request doesn't set this, get the default region setting.
+          if (!isLoadingCfsOnDemandSet) {
+            scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
+          }
           region.prepareScanner(scan);
           if (region.getCoprocessorHost() != null) {
             scanner = region.getCoprocessorHost().preScannerOpen(scan);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java Wed Jan  9 21:37:35 2013
@@ -27,6 +27,9 @@ import org.junit.experimental.categories
 
 import static org.junit.Assert.*;
 
+import java.util.List;
+import java.util.ArrayList;
+
 /**
  * Tests for {@link SingleColumnValueExcludeFilter}. Because this filter
  * extends {@link SingleColumnValueFilter}, only the added functionality is
@@ -52,16 +55,18 @@ public class TestSingleColumnValueExclud
         CompareOp.EQUAL, VAL_1);
 
     // A 'match' situation
-    KeyValue kv;
-    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
-    // INCLUDE expected because test column has not yet passed
-    assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
-    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
-    // Test column will pass (will match), will SKIP because test columns are excluded
-    assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP);
-    // Test column has already passed and matched, all subsequent columns are INCLUDE
-    kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
-    assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+
+    kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+    kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1));
+    kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+
+    filter.filterRow(kvs);
+
+    assertEquals("resultSize", kvs.size(), 2);
+    assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0);
+    assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0);
     assertFalse("allRemainingWhenMatch", filter.filterAllRemaining());
 
     // A 'mismatch' situation

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jan  9 21:37:35 2013
@@ -67,9 +67,11 @@ import org.apache.hadoop.hbase.filter.Bi
 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.NullComparator;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -198,7 +200,7 @@ public class TestHRegion extends HBaseTe
     System.out.println(results);
     assertEquals(0, results.size());
   }
-  
+
   @Test
   public void testToShowNPEOnRegionScannerReseek() throws Exception{
     String method = "testToShowNPEOnRegionScannerReseek";
@@ -2491,6 +2493,166 @@ public class TestHRegion extends HBaseTe
     }
   }
 
+  /**
+   * Added for HBASE-5416
+   *
+   * Here we test scan optimization when only subset of CFs are used in filter
+   * conditions.
+   */
+  public void testScanner_JoinedScanners() throws IOException {
+    byte [] tableName = Bytes.toBytes("testTable");
+    byte [] cf_essential = Bytes.toBytes("essential");
+    byte [] cf_joined = Bytes.toBytes("joined");
+    byte [] cf_alpha = Bytes.toBytes("alpha");
+    this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha);
+    try {
+      byte [] row1 = Bytes.toBytes("row1");
+      byte [] row2 = Bytes.toBytes("row2");
+      byte [] row3 = Bytes.toBytes("row3");
+
+      byte [] col_normal = Bytes.toBytes("d");
+      byte [] col_alpha = Bytes.toBytes("a");
+
+      byte [] filtered_val = Bytes.toBytes(3);
+
+      Put put = new Put(row1);
+      put.add(cf_essential, col_normal, Bytes.toBytes(1));
+      put.add(cf_joined, col_alpha, Bytes.toBytes(1));
+      region.put(put);
+
+      put = new Put(row2);
+      put.add(cf_essential, col_alpha, Bytes.toBytes(2));
+      put.add(cf_joined, col_normal, Bytes.toBytes(2));
+      put.add(cf_alpha, col_alpha, Bytes.toBytes(2));
+      region.put(put);
+
+      put = new Put(row3);
+      put.add(cf_essential, col_normal, filtered_val);
+      put.add(cf_joined, col_normal, filtered_val);
+      region.put(put);
+
+      // Check two things:
+      // 1. result list contains expected values
+      // 2. result list is sorted properly
+
+      Scan scan = new Scan();
+      Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
+                                                         CompareOp.NOT_EQUAL, filtered_val);
+      scan.setFilter(filter);
+      scan.setLoadColumnFamiliesOnDemand(true);
+      InternalScanner s = region.getScanner(scan);
+
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      assertTrue(s.next(results));
+      assertEquals(results.size(), 1);
+      results.clear();
+
+      assertTrue(s.next(results));
+      assertEquals(results.size(), 3);
+      assertTrue("orderCheck", results.get(0).matchingFamily(cf_alpha));
+      assertTrue("orderCheck", results.get(1).matchingFamily(cf_essential));
+      assertTrue("orderCheck", results.get(2).matchingFamily(cf_joined));
+      results.clear();
+
+      assertFalse(s.next(results));
+      assertEquals(results.size(), 0);
+    } finally {
+      HRegion.closeHRegion(this.region);
+      this.region = null;
+    }
+  }
+
+  /**
+   * HBASE-5416
+   *
+   * Test case when scan limits amount of KVs returned on each next() call.
+   */
+  public void testScanner_JoinedScannersWithLimits() throws IOException {
+    final byte [] tableName = Bytes.toBytes("testTable");
+    final byte [] cf_first = Bytes.toBytes("first");
+    final byte [] cf_second = Bytes.toBytes("second");
+
+    this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second);
+    try {
+      final byte [] col_a = Bytes.toBytes("a");
+      final byte [] col_b = Bytes.toBytes("b");
+
+      Put put;
+
+      for (int i = 0; i < 10; i++) {
+        put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
+        put.add(cf_first, col_a, Bytes.toBytes(i));
+        if (i < 5) {
+          put.add(cf_first, col_b, Bytes.toBytes(i));
+          put.add(cf_second, col_a, Bytes.toBytes(i));
+          put.add(cf_second, col_b, Bytes.toBytes(i));
+        }
+        region.put(put);
+      }
+
+      Scan scan = new Scan();
+      scan.setLoadColumnFamiliesOnDemand(true);
+      Filter bogusFilter = new FilterBase() {
+        @Override
+        public boolean isFamilyEssential(byte[] name) {
+          return Bytes.equals(name, cf_first);
+        }
+      };
+
+      scan.setFilter(bogusFilter);
+      InternalScanner s = region.getScanner(scan);
+
+      // Our data looks like this:
+      // r0: first:a, first:b, second:a, second:b
+      // r1: first:a, first:b, second:a, second:b
+      // r2: first:a, first:b, second:a, second:b
+      // r3: first:a, first:b, second:a, second:b
+      // r4: first:a, first:b, second:a, second:b
+      // r5: first:a
+      // r6: first:a
+      // r7: first:a
+      // r8: first:a
+      // r9: first:a
+
+      // But due to next's limit set to 3, we should get this:
+      // r0: first:a, first:b, second:a
+      // r0: second:b
+      // r1: first:a, first:b, second:a
+      // r1: second:b
+      // r2: first:a, first:b, second:a
+      // r2: second:b
+      // r3: first:a, first:b, second:a
+      // r3: second:b
+      // r4: first:a, first:b, second:a
+      // r4: second:b
+      // r5: first:a
+      // r6: first:a
+      // r7: first:a
+      // r8: first:a
+      // r9: first:a
+
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      int index = 0;
+      while (true) {
+        boolean more = s.next(results, 3);
+        if ((index >> 1) < 5) {
+          if (index % 2 == 0)
+            assertEquals(results.size(), 3);
+          else
+            assertEquals(results.size(), 1);
+        }
+        else
+          assertEquals(results.size(), 1);
+        results.clear();
+        index++;
+        if (!more) break;
+      }
+    } finally {
+      HRegion.closeHRegion(this.region);
+      this.region = null;
+    }
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // Split test
   //////////////////////////////////////////////////////////////////////////////

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java?rev=1431103&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java Wed Jan  9 21:37:35 2013
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+
+/**
+ * Test performance improvement of joined scanners optimization:
+ * https://issues.apache.org/jira/browse/HBASE-5416
+ */
+@Category(LargeTests.class)
+public class TestJoinedScanners {
+  static final Log LOG = LogFactory.getLog(TestJoinedScanners.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final String DIR = TEST_UTIL.getDataTestDir("TestJoinedScanners").toString();
+
+  private static final byte[] tableName = Bytes.toBytes("testTable");
+  private static final byte[] cf_essential = Bytes.toBytes("essential");
+  private static final byte[] cf_joined = Bytes.toBytes("joined");
+  private static final byte[] col_name = Bytes.toBytes("a");
+  private static final byte[] flag_yes = Bytes.toBytes("Y");
+  private static final byte[] flag_no  = Bytes.toBytes("N");
+
+  @Test
+  public void testJoinedScanners() throws Exception {
+    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
+    int regionServersCount = 3;
+
+    HBaseTestingUtility htu = new HBaseTestingUtility();
+
+    final int DEFAULT_BLOCK_SIZE = 1024*1024;
+    htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    htu.getConfiguration().setInt("dfs.replication", 1);
+    htu.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L);
+    MiniHBaseCluster cluster = null;
+
+    try {
+      cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
+      byte [][] families = {cf_essential, cf_joined};
+
+      HTable ht = htu.createTable(
+        Bytes.toBytes(this.getClass().getSimpleName()), families);
+
+      long rows_to_insert = 10000;
+      int insert_batch = 20;
+      int flag_percent = 1;
+      int large_bytes = 128 * 1024;
+      long time = System.nanoTime();
+
+      LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = "
+        + Float.toString(rows_to_insert * large_bytes / 1024 / 1024) + " MB");
+
+      byte [] val_large = new byte[large_bytes];
+
+      List<Put> puts = new ArrayList<Put>();
+
+      for (long i = 0; i < rows_to_insert; i++) {
+        Put put = new Put(Bytes.toBytes(Long.toString (i)));
+        if (i % 100 <= flag_percent) {
+          put.add(cf_essential, col_name, flag_yes);
+        }
+        else {
+          put.add(cf_essential, col_name, flag_no);
+        }
+        put.add(cf_joined, col_name, val_large);
+        puts.add(put);
+        if (puts.size() >= insert_batch) {
+          ht.put(puts);
+          puts.clear();
+        }
+      }
+      if (puts.size() >= 0) {
+        ht.put(puts);
+        puts.clear();
+      }
+
+      LOG.info("Data generated in "
+        + Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds");
+
+      boolean slow = true;
+      for (int i = 0; i < 20; ++i) {
+        runScanner(ht, slow);
+        slow = !slow;
+      }
+
+      ht.close();
+    } finally {
+      if (cluster != null) {
+        htu.shutdownMiniCluster();
+      }
+    }
+  }
+
+  private void runScanner(HTable table, boolean slow) throws Exception {
+    long time = System.nanoTime();
+    Scan scan = new Scan();
+    scan.addColumn(cf_essential, col_name);
+    scan.addColumn(cf_joined, col_name);
+
+    SingleColumnValueFilter filter = new SingleColumnValueFilter(
+        cf_essential, col_name, CompareFilter.CompareOp.EQUAL, flag_yes);
+    filter.setFilterIfMissing(true);
+    scan.setFilter(filter);
+    scan.setLoadColumnFamiliesOnDemand(!slow);
+
+    ResultScanner result_scanner = table.getScanner(scan);
+    Result res;
+    long rows_count = 0;
+    while ((res = result_scanner.next()) != null) {
+      rows_count++;
+    }
+
+    double timeSec = (System.nanoTime() - time) / 1000000000.0;
+    result_scanner.close();
+    LOG.info((slow ? "Slow" : "Joined") + " scanner finished in " + Double.toString(timeSec)
+      + " seconds, got " + Long.toString(rows_count/2) + " rows");
+  }
+
+  private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
+      String callingMethod, Configuration conf, byte[]... families)
+      throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for(byte [] family : families) {
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false);
+    Path path = new Path(DIR + callingMethod);
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(path)) {
+      if (!fs.delete(path, true)) {
+        throw new IOException("Failed delete of " + path);
+      }
+    }
+    return HRegion.createHRegion(info, path, conf, htd);
+  }
+}
\ No newline at end of file