You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/02/02 05:46:49 UTC
svn commit: r1441701 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase:
coprocessor/BaseRegionObserver.java coprocessor/RegionObserver.java
regionserver/HRegion.java regionserver/RegionCoprocessorHost.java
Author: larsh
Date: Sat Feb 2 04:46:48 2013
New Revision: 1441701
URL: http://svn.apache.org/viewvc?rev=1441701&view=rev
Log:
HBASE-5664 CP hooks in Scan flow for fast forward when filter filters out a row (Anoop Sam John)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1441701&r1=1441700&r2=1441701&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Sat Feb 2 04:46:48 2013
@@ -291,6 +291,12 @@ public abstract class BaseRegionObserver
}
@Override
+ public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException {
+ return hasMore;
+ }
+
+ @Override
public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> e,
final InternalScanner s) throws IOException {
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1441701&r1=1441700&r2=1441701&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Sat Feb 2 04:46:48 2013
@@ -697,6 +697,27 @@ public interface RegionObserver extends
throws IOException;
/**
+ * This will be called by the scan flow when the current scanned row is being filtered out by the
+ * filter. The filter may be filtering out the row via any of the below scenarios
+ * <ol>
+ * <li>
+ * <code>boolean filterRowKey(byte [] buffer, int offset, int length)</code> returning true</li>
+ * <li>
+ * <code>boolean filterRow()</code> returning true</li>
+ * <li>
+ * <code>void filterRow(List<KeyValue> kvs)</code> removing all the kvs from the passed List</li>
+ * </ol>
+ * @param c the environment provided by the region server
+ * @param s the scanner
+ * @param currentRow The current rowkey which got filtered out
+ * @param hasMore the 'has more' indication
+ * @return whether more rows are available for the scanner or not
+ * @throws IOException
+ */
+ boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException;
+
+ /**
* Called before the client closes a scanner.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1441701&r1=1441700&r2=1441701&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Feb 2 04:46:48 2013
@@ -1710,7 +1710,7 @@ public class HRegion implements HeapSize
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
- return new RegionScannerImpl(scan, additionalScanners);
+ return new RegionScannerImpl(scan, additionalScanners, this);
}
/*
@@ -3525,13 +3525,16 @@ public class HRegion implements HeapSize
private int isScan;
private boolean filterClosed = false;
private long readPt;
+ private HRegion region;
public HRegionInfo getRegionInfo() {
return regionInfo;
}
- RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
- //DebugPrint.println("HRegionScanner.<init>");
-
+
+ RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
+ throws IOException {
+ // DebugPrint.println("HRegionScanner.<init>");
+ this.region = region;
this.filter = scan.getFilter();
this.batch = scan.getBatch();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
@@ -3582,8 +3585,8 @@ public class HRegion implements HeapSize
}
}
- RegionScannerImpl(Scan scan) throws IOException {
- this(scan, null);
+ RegionScannerImpl(Scan scan, HRegion region) throws IOException {
+ this(scan, null, region);
}
@Override
@@ -3749,7 +3752,8 @@ public class HRegion implements HeapSize
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Techically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) {
- nextRow(currentRow, offset, length);
+ boolean moreRows = nextRow(currentRow, offset, length);
+ if (!moreRows) return false;
continue;
}
@@ -3778,7 +3782,8 @@ public class HRegion implements HeapSize
// the reasons for calling this method are:
// 1. reset the filters.
// 2. provide a hook to fast forward the row (used by subclasses)
- nextRow(currentRow, offset, length);
+ boolean moreRows = nextRow(currentRow, offset, length);
+ if (!moreRows) return false;
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
@@ -3818,7 +3823,8 @@ public class HRegion implements HeapSize
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleValueExcludeFilter is used.
if (results.isEmpty()) {
- nextRow(currentRow, offset, length);
+ boolean moreRows = nextRow(currentRow, offset, length);
+ if (!moreRows) return false;
if (!stopRow) continue;
}
@@ -3836,13 +3842,18 @@ public class HRegion implements HeapSize
&& filter.filterRowKey(row, offset, length);
}
- protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
+ protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
KeyValue next;
while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
this.storeHeap.next(MOCKED_LIST);
}
results.clear();
resetFilters();
+ // Calling the hook in CP which allows it to do a fast forward
+ if (this.region.getCoprocessorHost() != null) {
+ return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow);
+ }
+ return true;
}
private boolean isStopRow(byte [] currentRow, int offset, short length) {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1441701&r1=1441700&r2=1441701&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sat Feb 2 04:46:48 2013
@@ -1330,6 +1330,35 @@ public class RegionCoprocessorHost
}
/**
+ * This will be called by the scan flow when the current scanned row is being filtered out by the
+ * filter.
+ * @param s the scanner
+ * @param currentRow The current rowkey which got filtered out
+ * @return whether more rows are available for the scanner or not
+ * @throws IOException
+ */
+ public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow)
+ throws IOException {
+ boolean hasMore = true; // By default assume more rows there.
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
+ hasMore);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return hasMore;
+ }
+
+ /**
* @param s the scanner
* @return true if default behavior should be bypassed, false otherwise
* @exception IOException Exception