You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/09 22:37:36 UTC
svn commit: r1431103 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/
hbase-server/src/main/java/org/apache/hadoop/hbase/cl...
Author: tedyu
Date: Wed Jan 9 21:37:35 2013
New Revision: 1431103
URL: http://svn.apache.org/viewvc?rev=1431103&view=rev
Log:
HBASE-5416 Improve performance of scans with some kind of filters (Max Lapan and Sergey)
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java
Modified:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Wed Jan 9 21:37:35 2013
@@ -2049,6 +2049,19 @@ public class KeyValue implements Cell, H
}
/**
+ * Create a KeyValue that is smaller than all other possible KeyValues
+ * for the given row. That is any (valid) KeyValue on 'row' would sort
+ * _after_ the result.
+ *
+ * @param row - row key (arbitrary byte array)
+ * @return First possible KeyValue on passed <code>row</code>
+ */
+ public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
+ return new KeyValue(row, roffset, rlength,
+ null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
+ }
+
+ /**
* Creates a KeyValue that is smaller than all other KeyValues that
* are older than the passed timestamp.
* @param row - row key (arbitrary byte array)
Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java Wed Jan 9 21:37:35 2013
@@ -8992,6 +8992,10 @@ public final class ClientProtos {
// optional uint32 storeOffset = 12;
boolean hasStoreOffset();
int getStoreOffset();
+
+ // optional bool loadColumnFamiliesOnDemand = 13;
+ boolean hasLoadColumnFamiliesOnDemand();
+ boolean getLoadColumnFamiliesOnDemand();
}
public static final class Scan extends
com.google.protobuf.GeneratedMessage
@@ -9170,6 +9174,16 @@ public final class ClientProtos {
return storeOffset_;
}
+ // optional bool loadColumnFamiliesOnDemand = 13;
+ public static final int LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER = 13;
+ private boolean loadColumnFamiliesOnDemand_;
+ public boolean hasLoadColumnFamiliesOnDemand() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ public boolean getLoadColumnFamiliesOnDemand() {
+ return loadColumnFamiliesOnDemand_;
+ }
+
private void initFields() {
column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList();
@@ -9183,6 +9197,7 @@ public final class ClientProtos {
maxResultSize_ = 0L;
storeLimit_ = 0;
storeOffset_ = 0;
+ loadColumnFamiliesOnDemand_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -9250,6 +9265,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeUInt32(12, storeOffset_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ output.writeBool(13, loadColumnFamiliesOnDemand_);
+ }
getUnknownFields().writeTo(output);
}
@@ -9307,6 +9325,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(12, storeOffset_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(13, loadColumnFamiliesOnDemand_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -9384,6 +9406,11 @@ public final class ClientProtos {
result = result && (getStoreOffset()
== other.getStoreOffset());
}
+ result = result && (hasLoadColumnFamiliesOnDemand() == other.hasLoadColumnFamiliesOnDemand());
+ if (hasLoadColumnFamiliesOnDemand()) {
+ result = result && (getLoadColumnFamiliesOnDemand()
+ == other.getLoadColumnFamiliesOnDemand());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -9441,6 +9468,10 @@ public final class ClientProtos {
hash = (37 * hash) + STOREOFFSET_FIELD_NUMBER;
hash = (53 * hash) + getStoreOffset();
}
+ if (hasLoadColumnFamiliesOnDemand()) {
+ hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@@ -9601,6 +9632,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000400);
storeOffset_ = 0;
bitField0_ = (bitField0_ & ~0x00000800);
+ loadColumnFamiliesOnDemand_ = false;
+ bitField0_ = (bitField0_ & ~0x00001000);
return this;
}
@@ -9705,6 +9738,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000200;
}
result.storeOffset_ = storeOffset_;
+ if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+ to_bitField0_ |= 0x00000400;
+ }
+ result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -9803,6 +9840,9 @@ public final class ClientProtos {
if (other.hasStoreOffset()) {
setStoreOffset(other.getStoreOffset());
}
+ if (other.hasLoadColumnFamiliesOnDemand()) {
+ setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -9922,6 +9962,11 @@ public final class ClientProtos {
storeOffset_ = input.readUInt32();
break;
}
+ case 104: {
+ bitField0_ |= 0x00001000;
+ loadColumnFamiliesOnDemand_ = input.readBool();
+ break;
+ }
}
}
}
@@ -10654,6 +10699,27 @@ public final class ClientProtos {
return this;
}
+ // optional bool loadColumnFamiliesOnDemand = 13;
+ private boolean loadColumnFamiliesOnDemand_ ;
+ public boolean hasLoadColumnFamiliesOnDemand() {
+ return ((bitField0_ & 0x00001000) == 0x00001000);
+ }
+ public boolean getLoadColumnFamiliesOnDemand() {
+ return loadColumnFamiliesOnDemand_;
+ }
+ public Builder setLoadColumnFamiliesOnDemand(boolean value) {
+ bitField0_ |= 0x00001000;
+ loadColumnFamiliesOnDemand_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearLoadColumnFamiliesOnDemand() {
+ bitField0_ = (bitField0_ & ~0x00001000);
+ loadColumnFamiliesOnDemand_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Scan)
}
@@ -21446,7 +21512,7 @@ public final class ClientProtos {
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\027\n\006mutat" +
"e\030\002 \002(\0132\007.Mutate\022\035\n\tcondition\030\003 \001(\0132\n.Co" +
"ndition\"<\n\016MutateResponse\022\027\n\006result\030\001 \001(" +
- "\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\243\002\n\004Scan\022\027" +
+ "\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\307\002\n\004Scan\022\027" +
"\n\006column\030\001 \003(\0132\007.Column\022!\n\tattribute\030\002 \003" +
"(\0132\016.NameBytesPair\022\020\n\010startRow\030\003 \001(\014\022\017\n\007" +
"stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007.Filter\022\035" +
@@ -21454,50 +21520,50 @@ public final class ClientProtos {
"ions\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true" +
"\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001" +
"(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 " +
- "\001(\r\"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Re" +
- "gionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tsc" +
- "annerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014c" +
- "loseScanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\" +
- "\n\014ScanResponse\022\027\n\006result\030\001 \003(\0132\007.Result\022" +
- "\021\n\tscannerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022" +
- "\013\n\003ttl\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region",
- "\030\001 \002(\0132\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n" +
- "\017LockRowResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030" +
- "\002 \001(\r\"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(" +
- "\0132\020.RegionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021U" +
- "nlockRowResponse\"\260\001\n\024BulkLoadHFileReques" +
- "t\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\0224\n\nf" +
- "amilyPath\030\002 \003(\0132 .BulkLoadHFileRequest.F" +
- "amilyPath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamil" +
- "yPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025B" +
- "ulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"_\n\026",
- "CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013s" +
- "erviceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007" +
- "request\030\004 \002(\014\"d\n\031CoprocessorServiceReque" +
- "st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004" +
- "call\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032C" +
- "oprocessorServiceResponse\022 \n\006region\030\001 \002(" +
- "\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.Nam" +
- "eBytesPair\"9\n\013MultiAction\022\027\n\006mutate\030\001 \001(" +
- "\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionR" +
- "esult\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texcepti",
- "on\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiRequest" +
- "\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006ac" +
- "tion\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010" +
- "\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Acti" +
- "onResult2\223\003\n\rClientService\022 \n\003get\022\013.GetR" +
- "equest\032\014.GetResponse\022)\n\006mutate\022\016.MutateR" +
- "equest\032\017.MutateResponse\022#\n\004scan\022\014.ScanRe" +
- "quest\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRo" +
- "wRequest\032\020.LockRowResponse\0222\n\tunlockRow\022" +
- "\021.UnlockRowRequest\032\022.UnlockRowResponse\022>",
- "\n\rbulkLoadHFile\022\025.BulkLoadHFileRequest\032\026" +
- ".BulkLoadHFileResponse\022F\n\013execService\022\032." +
- "CoprocessorServiceRequest\032\033.CoprocessorS" +
- "erviceResponse\022&\n\005multi\022\r.MultiRequest\032\016" +
- ".MultiResponseBB\n*org.apache.hadoop.hbas" +
- "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" +
- "\001\001"
+ "\001(\r\022\"\n\032loadColumnFamiliesOnDemand\030\r \001(\010\"" +
+ "\230\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Region" +
+ "Specifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscanne" +
+ "rId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014close" +
+ "Scanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\\n\014Sc" +
+ "anResponse\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\ts" +
+ "cannerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003t",
+ "tl\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002" +
+ "(\0132\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017Loc" +
+ "kRowResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(" +
+ "\r\"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020." +
+ "RegionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021Unloc" +
+ "kRowResponse\"\260\001\n\024BulkLoadHFileRequest\022 \n" +
+ "\006region\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamil" +
+ "yPath\030\002 \003(\0132 .BulkLoadHFileRequest.Famil" +
+ "yPath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamilyPat" +
+ "h\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkL",
+ "oadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"_\n\026Copr" +
+ "ocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013servi" +
+ "ceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007requ" +
+ "est\030\004 \002(\014\"d\n\031CoprocessorServiceRequest\022 " +
+ "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call" +
+ "\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032Copro" +
+ "cessorServiceResponse\022 \n\006region\030\001 \002(\0132\020." +
+ "RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameByt" +
+ "esPair\"9\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007." +
+ "Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResul",
+ "t\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texception\030\002" +
+ " \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006" +
+ "region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action" +
+ "\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\r" +
+ "MultiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionRe" +
+ "sult2\223\003\n\rClientService\022 \n\003get\022\013.GetReque" +
+ "st\032\014.GetResponse\022)\n\006mutate\022\016.MutateReque" +
+ "st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" +
+ "t\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRowReq" +
+ "uest\032\020.LockRowResponse\0222\n\tunlockRow\022\021.Un",
+ "lockRowRequest\032\022.UnlockRowResponse\022>\n\rbu" +
+ "lkLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bul" +
+ "kLoadHFileResponse\022F\n\013execService\022\032.Copr" +
+ "ocessorServiceRequest\032\033.CoprocessorServi" +
+ "ceResponse\022&\n\005multi\022\r.MultiRequest\032\016.Mul" +
+ "tiResponseBB\n*org.apache.hadoop.hbase.pr" +
+ "otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21597,7 +21663,7 @@ public final class ClientProtos {
internal_static_Scan_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Scan_descriptor,
- new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", },
+ new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", },
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
internal_static_ScanRequest_descriptor =
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto Wed Jan 9 21:37:35 2013
@@ -186,6 +186,7 @@ message Scan {
optional uint64 maxResultSize = 10;
optional uint32 storeLimit = 11;
optional uint32 storeOffset = 12;
+ optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java Wed Jan 9 21:37:35 2013
@@ -92,7 +92,7 @@ public class Scan extends OperationWithA
private int storeLimit = -1;
private int storeOffset = 0;
-
+
// If application wants to collect scan metrics, it needs to
// call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
static public final String SCAN_ATTRIBUTES_METRICS_ENABLE =
@@ -110,6 +110,7 @@ public class Scan extends OperationWithA
private TimeRange tr = new TimeRange();
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+ private Boolean loadColumnFamiliesOnDemand = null;
/**
* Create a Scan operation across all rows.
@@ -159,6 +160,7 @@ public class Scan extends OperationWithA
maxResultSize = scan.getMaxResultSize();
cacheBlocks = scan.getCacheBlocks();
filter = scan.getFilter(); // clone?
+ loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
TimeRange ctr = scan.getTimeRange();
tr = new TimeRange(ctr.getMin(), ctr.getMax());
Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
@@ -519,6 +521,41 @@ public class Scan extends OperationWithA
}
/**
+ * Set the value indicating whether loading CFs on demand should be allowed (cluster
+ * default is false). On-demand CF loading doesn't load column families until necessary, e.g.
+ * if you filter on one column, the other column family data will be loaded only for the rows
+ * that are included in result, not all rows like in normal case.
+ * With column-specific filters, like SingleColumnValueFilter w/filterIfMissing == true,
+ * this can deliver huge perf gains when there's a cf with lots of data; however, it can
+ * also lead to some inconsistent results, as follows:
+ * - if someone does a concurrent update to both column families in question you may get a row
+ * that never existed, e.g. for { rowKey = 5, { cat_videos => 1 }, { video => "my cat" } }
+ * someone puts rowKey 5 with { cat_videos => 0 }, { video => "my dog" }, concurrent scan
+ * filtering on "cat_videos == 1" can get { rowKey = 5, { cat_videos => 1 },
+ * { video => "my dog" } }.
+ * - if there's a concurrent split and you have more than 2 column families, some rows may be
+ * missing some column families.
+ */
+ public void setLoadColumnFamiliesOnDemand(boolean value) {
+ this.loadColumnFamiliesOnDemand = value;
+ }
+
+ /**
+ * Get the raw loadColumnFamiliesOnDemand setting; if it's not set, can be null.
+ */
+ public Boolean getLoadColumnFamiliesOnDemandValue() {
+ return this.loadColumnFamiliesOnDemand;
+ }
+
+ /**
+ * Get the logical value indicating whether on-demand CF loading should be allowed.
+ */
+ public boolean doLoadColumnFamiliesOnDemand() {
+ return (this.loadColumnFamiliesOnDemand != null)
+ && this.loadColumnFamiliesOnDemand.booleanValue();
+ }
+
+ /**
* Compile the table and column family (i.e. schema) information
* into a String. Useful for parsing and aggregation by debugging,
* logging, and administration tools.
@@ -547,7 +584,7 @@ public class Scan extends OperationWithA
* Useful for debugging, logging, and administration tools.
* @param maxCols a limit on the number of columns output prior to truncation
* @return Map
- */
+ */
@Override
public Map<String, Object> toMap(int maxCols) {
// start with the fingerpring map and build on top of it
@@ -564,6 +601,7 @@ public class Scan extends OperationWithA
map.put("caching", this.caching);
map.put("maxResultSize", this.maxResultSize);
map.put("cacheBlocks", this.cacheBlocks);
+ map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
List<Long> timeRange = new ArrayList<Long>();
timeRange.add(this.tr.getMin());
timeRange.add(this.tr.getMax());
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Wed Jan 9 21:37:35 2013
@@ -172,6 +172,14 @@ public abstract class Filter {
abstract public KeyValue getNextKeyHint(final KeyValue currentKV);
/**
+ * Check that given column family is essential for filter to check row. Most
+ * filters always return true here. But some could have more sophisticated
+ * logic which could significantly reduce scanning process by not even
+ * touching columns until we are 100% sure that it's data is needed in result.
+ */
+ abstract public boolean isFamilyEssential(byte[] name);
+
+ /**
* @return The filter serialized using pb
*/
abstract public byte [] toByteArray();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java Wed Jan 9 21:37:35 2013
@@ -134,6 +134,16 @@ public abstract class FilterBase extends
}
/**
+ * By default, we require all scan's column families to be present. Our
+ * subclasses may be more precise.
+ *
+ * @inheritDoc
+ */
+ public boolean isFamilyEssential(byte[] name) {
+ return true;
+ }
+
+ /**
* Given the filter's arguments it constructs the filter
* <p>
* @param filterArguments the filter's arguments
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java Wed Jan 9 21:37:35 2013
@@ -361,6 +361,16 @@ public class FilterList extends Filter {
}
@Override
+ public boolean isFamilyEssential(byte[] name) {
+ for (Filter filter : filters) {
+ if (filter.isFamilyEssential(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
public String toString() {
return toString(MAX_LOG_FILTERS);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java Wed Jan 9 21:37:35 2013
@@ -136,6 +136,11 @@ public class FilterWrapper extends Filte
}
}
+ @Override
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ };
+
/**
* @param other
* @return true if and only if the fields of the filter that are serialized
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java Wed Jan 9 21:37:35 2013
@@ -31,6 +31,8 @@ import com.google.protobuf.InvalidProtoc
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
/**
* A {@link Filter} that checks a single column value, but does not emit the
@@ -96,16 +98,22 @@ public class SingleColumnValueExcludeFil
matchedColumn,filterIfMissing,latestVersionOnly);
}
- public ReturnCode filterKeyValue(KeyValue keyValue) {
- ReturnCode superRetCode = super.filterKeyValue(keyValue);
- if (superRetCode == ReturnCode.INCLUDE) {
+ // We cleaned result row in FilterRow to be consistent with scanning process.
+ public boolean hasFilterRow() {
+ return true;
+ }
+
+ // Here we remove from row all key values from testing column
+ public void filterRow(List<KeyValue> kvs) {
+ Iterator it = kvs.iterator();
+ while (it.hasNext()) {
+ KeyValue kv = (KeyValue)it.next();
// If the current column is actually the tested column,
// we will skip it instead.
- if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
- return ReturnCode.SKIP;
+ if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
+ it.remove();
}
}
- return superRetCode;
}
public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Wed Jan 9 21:37:35 2013
@@ -379,6 +379,15 @@ public class SingleColumnValueFilter ext
&& this.getLatestVersionOnly() == other.getLatestVersionOnly();
}
+ /**
+ * The only CF this filter needs is given column family. So, it's the only essential
+ * column in whole scan. If filterIfMissing == false, all families are essential,
+ * because of possibility of skipping the rows without any data in filtered CF.
+ */
+ public boolean isFamilyEssential(byte[] name) {
+ return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
+ }
+
@Override
public String toString() {
return String.format("%s (%s, %s, %s, %s)",
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java Wed Jan 9 21:37:35 2013
@@ -138,6 +138,10 @@ public class SkipFilter extends FilterBa
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java Wed Jan 9 21:37:35 2013
@@ -138,6 +138,10 @@ public class WhileMatchFilter extends Fi
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Jan 9 21:37:35 2013
@@ -565,6 +565,10 @@ public final class ProtobufUtil {
if (scan.getMaxResultSize() > 0) {
scanBuilder.setMaxResultSize(scan.getMaxResultSize());
}
+ Boolean loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
+ if (loadColumnFamiliesOnDemand != null) {
+ scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
+ }
scanBuilder.setMaxVersions(scan.getMaxVersions());
TimeRange timeRange = scan.getTimeRange();
if (!timeRange.isAllTime()) {
@@ -648,6 +652,9 @@ public final class ProtobufUtil {
if (proto.hasStoreOffset()) {
scan.setRowOffsetPerColumnFamily(proto.getStoreOffset());
}
+ if (proto.hasLoadColumnFamiliesOnDemand()) {
+ scan.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
+ }
if (proto.hasTimeRange()) {
HBaseProtos.TimeRange timeRange = proto.getTimeRange();
long minStamp = 0;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jan 9 21:37:35 2013
@@ -184,6 +184,9 @@ public class HRegion implements HeapSize
public static final Log LOG = LogFactory.getLog(HRegion.class);
private static final String MERGEDIR = ".merges";
+ public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
+ "hbase.hregion.scan.loadColumnFamiliesOnDemand";
+
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
@@ -281,6 +284,13 @@ public class HRegion implements HeapSize
private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/**
+ * The default setting for whether to enable on-demand CF loading for
+ * scan requests to this region. Requests can override it.
+ */
+ private boolean isLoadingCfsOnDemandDefault = false;
+
+
+ /**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
* read operation.
@@ -455,6 +465,8 @@ public class HRegion implements HeapSize
.add(htd.getValues());
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
+
+ this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, false);
this.regionInfo = regionInfo;
this.htableDescriptor = htd;
this.rsServices = rsServices;
@@ -915,6 +927,10 @@ public class HRegion implements HeapSize
return mvcc;
}
+ public boolean isLoadingCfsOnDemandDefault() {
+ return this.isLoadingCfsOnDemandDefault;
+ }
+
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
@@ -3383,6 +3399,15 @@ public class HRegion implements HeapSize
class RegionScannerImpl implements RegionScanner {
// Package local for testability
KeyValueHeap storeHeap = null;
+ /** Heap of key-values that are not essential for the provided filters and are thus read
+ * on demand, if on-demand column family loading is enabled.*/
+ KeyValueHeap joinedHeap = null;
+ /**
+ * If the joined heap data gathering is interrupted due to scan limits, this will
+ * contain the row for which we are populating the values.*/
+ private KeyValue joinedContinuationRow = null;
+ // KeyValue indicating that limit is reached when scanning
+ private final KeyValue KV_LIMIT = new KeyValue();
private final byte [] stopRow;
private Filter filter;
private List<KeyValue> results = new ArrayList<KeyValue>();
@@ -3429,7 +3454,10 @@ public class HRegion implements HeapSize
scannerReadPoints.put(this, this.readPt);
}
+ // Here we separate all scanners into two lists - scanner that provide data required
+ // by the filter to operate (scanners list) and all others (joinedScanners list).
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+ List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
@@ -3438,9 +3466,17 @@ public class HRegion implements HeapSize
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
- scanners.add(scanner);
+ if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
+ || this.filter.isFamilyEssential(entry.getKey())) {
+ scanners.add(scanner);
+ } else {
+ joinedScanners.add(scanner);
+ }
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
+ if (!joinedScanners.isEmpty()) {
+ this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
+ }
}
RegionScannerImpl(Scan scan) throws IOException {
@@ -3527,6 +3563,43 @@ public class HRegion implements HeapSize
return next(outResults, batch, metric);
}
+ private void populateFromJoinedHeap(int limit, String metric) throws IOException {
+ assert joinedContinuationRow != null;
+ KeyValue kv = populateResult(this.joinedHeap, limit, joinedContinuationRow.getBuffer(),
+ joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength(), metric);
+ if (kv != KV_LIMIT) {
+ // We are done with this row, reset the continuation.
+ joinedContinuationRow = null;
+ }
+ // As the data is obtained from two independent heaps, we need to
+ // ensure that result list is sorted, because Result relies on that.
+ Collections.sort(results, comparator);
+ }
+
+ /**
+ * Fetches records with currentRow into results list, until next row or limit (if not -1).
+ * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
+ * @param limit Max amount of KVs to place in result list, -1 means no limit.
+ * @param currentRow Byte array with key we are fetching.
+ * @param offset offset for currentRow
+ * @param length length for currentRow
+ * @param metric Metric key to be passed into KeyValueHeap::next().
+ * @return KV_LIMIT if limit reached, next KeyValue otherwise.
+ */
+ private KeyValue populateResult(KeyValueHeap heap, int limit, byte[] currentRow, int offset,
+ short length, String metric) throws IOException {
+ KeyValue nextKv;
+ do {
+ heap.next(results, limit - results.size(), metric);
+ if (limit > 0 && results.size() == limit) {
+ return KV_LIMIT;
+ }
+ nextKv = heap.peek();
+ } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+
+ return nextKv;
+ }
+
/*
* @return True if a filter rules the scanner is over, done.
*/
@@ -3536,6 +3609,11 @@ public class HRegion implements HeapSize
private boolean nextInternal(int limit, String metric) throws IOException {
RpcCallContext rpcCall = HBaseServer.getCurrentCall();
+ // The loop here is used only when at some point during the next we determine
+ // that due to effects of filters or otherwise, we have an empty row in the result.
+ // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
+ // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
+ // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
while (true) {
if (rpcCall != null) {
// If a user specifies a too-restrictive or too-slow scanner, the
@@ -3545,7 +3623,9 @@ public class HRegion implements HeapSize
rpcCall.throwExceptionIfCallerDisconnected();
}
+ // Let's see what we have in the storeHeap.
KeyValue current = this.storeHeap.peek();
+
byte[] currentRow = null;
int offset = 0;
short length = 0;
@@ -3554,38 +3634,47 @@ public class HRegion implements HeapSize
offset = current.getRowOffset();
length = current.getRowLength();
}
- if (isStopRow(currentRow, offset, length)) {
- if (filter != null && filter.hasFilterRow()) {
- filter.filterRow(results);
+ boolean stopRow = isStopRow(currentRow, offset, length);
+ // Check if we were getting data from the joinedHeap and hit the limit.
+ // If not, then it's main path - getting results from storeHeap.
+ if (joinedContinuationRow == null) {
+ // First, check if we are at a stop row. If so, there are no more results.
+ if (stopRow) {
+ if (filter != null && filter.hasFilterRow()) {
+ filter.filterRow(results);
+ }
+ return false;
}
- return false;
- } else if (filterRowKey(currentRow, offset, length)) {
- nextRow(currentRow, offset, length);
- } else {
- KeyValue nextKv;
- do {
- this.storeHeap.next(results, limit - results.size(), metric);
- if (limit > 0 && results.size() == limit) {
- if (this.filter != null && filter.hasFilterRow()) {
- throw new IncompatibleFilterException(
- "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
- }
- return true; // we are expecting more yes, but also limited to how many we can return.
- }
- nextKv = this.storeHeap.peek();
- } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+ // Check if rowkey filter wants to exclude this row. If so, loop to next.
+ // Technically, if we hit limits before on this row, we don't need this call.
+ if (filterRowKey(currentRow, offset, length)) {
+ nextRow(currentRow, offset, length);
+ continue;
+ }
- final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+ KeyValue nextKv = populateResult(this.storeHeap, limit, currentRow, offset, length,
+ metric);
+ // Ok, we are good, let's try to get some results from the main heap.
+ if (nextKv == KV_LIMIT) {
+ if (this.filter != null && filter.hasFilterRow()) {
+ throw new IncompatibleFilterException(
+ "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
+ }
+ return true; // We hit the limit.
+ }
- // now that we have an entire row, lets process with a filters:
+ stopRow = nextKv == null ||
+ isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+ // save that the row was empty before filters applied to it.
+ final boolean isEmptyRow = results.isEmpty();
- // first filter with the filterRow(List)
+ // We have the part of the row necessary for filtering (all of it, usually).
+ // First filter with the filterRow(List).
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
-
- if (results.isEmpty()) {
+ if (isEmptyRow) {
// this seems like a redundant step - we already consumed the row
// there're no left overs.
// the reasons for calling this method are:
@@ -3594,12 +3683,48 @@ public class HRegion implements HeapSize
nextRow(currentRow, offset, length);
// This row was totally filtered out, if this is NOT the last row,
- // we should continue on.
-
+ // we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
+ return false;
+ }
+
+ // Ok, we are done with storeHeap for this row.
+ // Now we may need to fetch additional, non-essential data into row.
+ // These values are not needed for filter to work, so we postpone their
+ // fetch to (possibly) reduce amount of data loads from disk.
+ if (this.joinedHeap != null) {
+ KeyValue nextJoinedKv = joinedHeap.peek();
+ // If joinedHeap is pointing to some other row, try to seek to a correct one.
+ // We don't need to recheck that row here - populateResult will take care of that.
+ boolean mayHaveData =
+ (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
+ || this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
+ if (mayHaveData) {
+ joinedContinuationRow = current;
+ populateFromJoinedHeap(limit, metric);
+ }
}
- return !stopRow;
+ } else {
+ // Populating from the joined heap was stopped by limits, populate some more.
+ populateFromJoinedHeap(limit, metric);
}
+
+ // We may have just called populateFromJoinedMap and hit the limits. If that is
+ // the case, we need to call it again on the next next() invocation.
+ if (joinedContinuationRow != null) {
+ return true;
+ }
+
+ // Finally, we are done with both joinedHeap and storeHeap.
+ // Double check to prevent empty rows from appearing in result. It could be
+ // the case when SingleColumnValueExcludeFilter is used.
+ if (results.isEmpty()) {
+ nextRow(currentRow, offset, length);
+ if (!stopRow) continue;
+ }
+
+ // We are done. Return the result.
+ return !stopRow;
}
}
@@ -3609,8 +3734,9 @@ public class HRegion implements HeapSize
}
protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
+ assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
KeyValue next;
- while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
+ while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
this.storeHeap.next(MOCKED_LIST);
}
results.clear();
@@ -3621,7 +3747,7 @@ public class HRegion implements HeapSize
return currentRow == null ||
(stopRow != null &&
comparator.compareRows(stopRow, 0, stopRow.length,
- currentRow, offset, length) <= isScan);
+ currentRow, offset, length) <= isScan);
}
@Override
@@ -3630,6 +3756,10 @@ public class HRegion implements HeapSize
storeHeap.close();
storeHeap = null;
}
+ if (joinedHeap != null) {
+ joinedHeap.close();
+ joinedHeap = null;
+ }
// no need to sychronize here.
scannerReadPoints.remove(this);
this.filterClosed = true;
@@ -3644,16 +3774,21 @@ public class HRegion implements HeapSize
if (row == null) {
throw new IllegalArgumentException("Row cannot be null.");
}
+ boolean result = false;
startRegionOperation();
try {
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
KeyValue kv = KeyValue.createFirstOnRow(row);
// use request seek to make use of the lazy seek option. See HBASE-5520
- return this.storeHeap.requestSeek(kv, true, true);
+ result = this.storeHeap.requestSeek(kv, true, true);
+ if (this.joinedHeap != null) {
+ result = this.joinedHeap.requestSeek(kv, true, true) || result;
+ }
} finally {
closeRegionOperation();
}
+ return result;
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jan 9 21:37:35 2013
@@ -2898,7 +2898,12 @@ public class HRegionServer implements C
} else {
region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
+ boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan);
+ // if the request doesn't set this, get the default region setting.
+ if (!isLoadingCfsOnDemandSet) {
+ scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
+ }
region.prepareScanner(scan);
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().preScannerOpen(scan);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java Wed Jan 9 21:37:35 2013
@@ -27,6 +27,9 @@ import org.junit.experimental.categories
import static org.junit.Assert.*;
+import java.util.List;
+import java.util.ArrayList;
+
/**
* Tests for {@link SingleColumnValueExcludeFilter}. Because this filter
* extends {@link SingleColumnValueFilter}, only the added functionality is
@@ -52,16 +55,18 @@ public class TestSingleColumnValueExclud
CompareOp.EQUAL, VAL_1);
// A 'match' situation
- KeyValue kv;
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
- // INCLUDE expected because test column has not yet passed
- assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
- // Test column will pass (will match), will SKIP because test columns are excluded
- assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP);
- // Test column has already passed and matched, all subsequent columns are INCLUDE
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
- assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1));
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+
+ filter.filterRow(kvs);
+
+ assertEquals("resultSize", kvs.size(), 2);
+ assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0);
+ assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0);
assertFalse("allRemainingWhenMatch", filter.filterAllRemaining());
// A 'mismatch' situation
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1431103&r1=1431102&r2=1431103&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jan 9 21:37:35 2013
@@ -67,9 +67,11 @@ import org.apache.hadoop.hbase.filter.Bi
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -198,7 +200,7 @@ public class TestHRegion extends HBaseTe
System.out.println(results);
assertEquals(0, results.size());
}
-
+
@Test
public void testToShowNPEOnRegionScannerReseek() throws Exception{
String method = "testToShowNPEOnRegionScannerReseek";
@@ -2491,6 +2493,166 @@ public class TestHRegion extends HBaseTe
}
}
+ /**
+ * Added for HBASE-5416
+ *
+ * Here we test scan optimization when only subset of CFs are used in filter
+ * conditions.
+ */
+ public void testScanner_JoinedScanners() throws IOException {
+ byte [] tableName = Bytes.toBytes("testTable");
+ byte [] cf_essential = Bytes.toBytes("essential");
+ byte [] cf_joined = Bytes.toBytes("joined");
+ byte [] cf_alpha = Bytes.toBytes("alpha");
+ this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha);
+ try {
+ byte [] row1 = Bytes.toBytes("row1");
+ byte [] row2 = Bytes.toBytes("row2");
+ byte [] row3 = Bytes.toBytes("row3");
+
+ byte [] col_normal = Bytes.toBytes("d");
+ byte [] col_alpha = Bytes.toBytes("a");
+
+ byte [] filtered_val = Bytes.toBytes(3);
+
+ Put put = new Put(row1);
+ put.add(cf_essential, col_normal, Bytes.toBytes(1));
+ put.add(cf_joined, col_alpha, Bytes.toBytes(1));
+ region.put(put);
+
+ put = new Put(row2);
+ put.add(cf_essential, col_alpha, Bytes.toBytes(2));
+ put.add(cf_joined, col_normal, Bytes.toBytes(2));
+ put.add(cf_alpha, col_alpha, Bytes.toBytes(2));
+ region.put(put);
+
+ put = new Put(row3);
+ put.add(cf_essential, col_normal, filtered_val);
+ put.add(cf_joined, col_normal, filtered_val);
+ region.put(put);
+
+ // Check two things:
+ // 1. result list contains expected values
+ // 2. result list is sorted properly
+
+ Scan scan = new Scan();
+ Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
+ CompareOp.NOT_EQUAL, filtered_val);
+ scan.setFilter(filter);
+ scan.setLoadColumnFamiliesOnDemand(true);
+ InternalScanner s = region.getScanner(scan);
+
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ assertTrue(s.next(results));
+ assertEquals(results.size(), 1);
+ results.clear();
+
+ assertTrue(s.next(results));
+ assertEquals(results.size(), 3);
+ assertTrue("orderCheck", results.get(0).matchingFamily(cf_alpha));
+ assertTrue("orderCheck", results.get(1).matchingFamily(cf_essential));
+ assertTrue("orderCheck", results.get(2).matchingFamily(cf_joined));
+ results.clear();
+
+ assertFalse(s.next(results));
+ assertEquals(results.size(), 0);
+ } finally {
+ HRegion.closeHRegion(this.region);
+ this.region = null;
+ }
+ }
+
+ /**
+ * HBASE-5416
+ *
+ * Test case when scan limits amount of KVs returned on each next() call.
+ */
+ public void testScanner_JoinedScannersWithLimits() throws IOException {
+ final byte [] tableName = Bytes.toBytes("testTable");
+ final byte [] cf_first = Bytes.toBytes("first");
+ final byte [] cf_second = Bytes.toBytes("second");
+
+ this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second);
+ try {
+ final byte [] col_a = Bytes.toBytes("a");
+ final byte [] col_b = Bytes.toBytes("b");
+
+ Put put;
+
+ for (int i = 0; i < 10; i++) {
+ put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
+ put.add(cf_first, col_a, Bytes.toBytes(i));
+ if (i < 5) {
+ put.add(cf_first, col_b, Bytes.toBytes(i));
+ put.add(cf_second, col_a, Bytes.toBytes(i));
+ put.add(cf_second, col_b, Bytes.toBytes(i));
+ }
+ region.put(put);
+ }
+
+ Scan scan = new Scan();
+ scan.setLoadColumnFamiliesOnDemand(true);
+ Filter bogusFilter = new FilterBase() {
+ @Override
+ public boolean isFamilyEssential(byte[] name) {
+ return Bytes.equals(name, cf_first);
+ }
+ };
+
+ scan.setFilter(bogusFilter);
+ InternalScanner s = region.getScanner(scan);
+
+ // Our data looks like this:
+ // r0: first:a, first:b, second:a, second:b
+ // r1: first:a, first:b, second:a, second:b
+ // r2: first:a, first:b, second:a, second:b
+ // r3: first:a, first:b, second:a, second:b
+ // r4: first:a, first:b, second:a, second:b
+ // r5: first:a
+ // r6: first:a
+ // r7: first:a
+ // r8: first:a
+ // r9: first:a
+
+ // But due to next's limit set to 3, we should get this:
+ // r0: first:a, first:b, second:a
+ // r0: second:b
+ // r1: first:a, first:b, second:a
+ // r1: second:b
+ // r2: first:a, first:b, second:a
+ // r2: second:b
+ // r3: first:a, first:b, second:a
+ // r3: second:b
+ // r4: first:a, first:b, second:a
+ // r4: second:b
+ // r5: first:a
+ // r6: first:a
+ // r7: first:a
+ // r8: first:a
+ // r9: first:a
+
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ int index = 0;
+ while (true) {
+ boolean more = s.next(results, 3);
+ if ((index >> 1) < 5) {
+ if (index % 2 == 0)
+ assertEquals(results.size(), 3);
+ else
+ assertEquals(results.size(), 1);
+ }
+ else
+ assertEquals(results.size(), 1);
+ results.clear();
+ index++;
+ if (!more) break;
+ }
+ } finally {
+ HRegion.closeHRegion(this.region);
+ this.region = null;
+ }
+ }
+
//////////////////////////////////////////////////////////////////////////////
// Split test
//////////////////////////////////////////////////////////////////////////////
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java?rev=1431103&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java Wed Jan 9 21:37:35 2013
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+
+/**
+ * Test performance improvement of joined scanners optimization:
+ * https://issues.apache.org/jira/browse/HBASE-5416
+ */
+@Category(LargeTests.class)
+public class TestJoinedScanners {
+ static final Log LOG = LogFactory.getLog(TestJoinedScanners.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final String DIR = TEST_UTIL.getDataTestDir("TestJoinedScanners").toString();
+
+ private static final byte[] tableName = Bytes.toBytes("testTable");
+ private static final byte[] cf_essential = Bytes.toBytes("essential");
+ private static final byte[] cf_joined = Bytes.toBytes("joined");
+ private static final byte[] col_name = Bytes.toBytes("a");
+ private static final byte[] flag_yes = Bytes.toBytes("Y");
+ private static final byte[] flag_no = Bytes.toBytes("N");
+
+ @Test
+ public void testJoinedScanners() throws Exception {
+ String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
+ int regionServersCount = 3;
+
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+
+ final int DEFAULT_BLOCK_SIZE = 1024*1024;
+ htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ htu.getConfiguration().setInt("dfs.replication", 1);
+ htu.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L);
+ MiniHBaseCluster cluster = null;
+
+ try {
+ cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
+ byte [][] families = {cf_essential, cf_joined};
+
+ HTable ht = htu.createTable(
+ Bytes.toBytes(this.getClass().getSimpleName()), families);
+
+ long rows_to_insert = 10000;
+ int insert_batch = 20;
+ int flag_percent = 1;
+ int large_bytes = 128 * 1024;
+ long time = System.nanoTime();
+
+ LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = "
+ + Float.toString(rows_to_insert * large_bytes / 1024 / 1024) + " MB");
+
+ byte [] val_large = new byte[large_bytes];
+
+ List<Put> puts = new ArrayList<Put>();
+
+ for (long i = 0; i < rows_to_insert; i++) {
+ Put put = new Put(Bytes.toBytes(Long.toString (i)));
+ if (i % 100 <= flag_percent) {
+ put.add(cf_essential, col_name, flag_yes);
+ }
+ else {
+ put.add(cf_essential, col_name, flag_no);
+ }
+ put.add(cf_joined, col_name, val_large);
+ puts.add(put);
+ if (puts.size() >= insert_batch) {
+ ht.put(puts);
+ puts.clear();
+ }
+ }
+ if (puts.size() >= 0) {
+ ht.put(puts);
+ puts.clear();
+ }
+
+ LOG.info("Data generated in "
+ + Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds");
+
+ boolean slow = true;
+ for (int i = 0; i < 20; ++i) {
+ runScanner(ht, slow);
+ slow = !slow;
+ }
+
+ ht.close();
+ } finally {
+ if (cluster != null) {
+ htu.shutdownMiniCluster();
+ }
+ }
+ }
+
+ private void runScanner(HTable table, boolean slow) throws Exception {
+ long time = System.nanoTime();
+ Scan scan = new Scan();
+ scan.addColumn(cf_essential, col_name);
+ scan.addColumn(cf_joined, col_name);
+
+ SingleColumnValueFilter filter = new SingleColumnValueFilter(
+ cf_essential, col_name, CompareFilter.CompareOp.EQUAL, flag_yes);
+ filter.setFilterIfMissing(true);
+ scan.setFilter(filter);
+ scan.setLoadColumnFamiliesOnDemand(!slow);
+
+ ResultScanner result_scanner = table.getScanner(scan);
+ Result res;
+ long rows_count = 0;
+ while ((res = result_scanner.next()) != null) {
+ rows_count++;
+ }
+
+ double timeSec = (System.nanoTime() - time) / 1000000000.0;
+ result_scanner.close();
+ LOG.info((slow ? "Slow" : "Joined") + " scanner finished in " + Double.toString(timeSec)
+ + " seconds, got " + Long.toString(rows_count/2) + " rows");
+ }
+
+ private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
+ String callingMethod, Configuration conf, byte[]... families)
+ throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ for(byte [] family : families) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+ HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false);
+ Path path = new Path(DIR + callingMethod);
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(path)) {
+ if (!fs.delete(path, true)) {
+ throw new IOException("Failed delete of " + path);
+ }
+ }
+ return HRegion.createHRegion(info, path, conf, htd);
+ }
+}
\ No newline at end of file