You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/11 06:46:29 UTC
svn commit: r1420004 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/coprocessor/
Author: larsh
Date: Tue Dec 11 05:46:27 2012
New Revision: 1420004
URL: http://svn.apache.org/viewvc?rev=1420004&view=rev
Log:
HBASE-7180 RegionScannerImpl.next() is inefficient.
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Dec 11 05:46:27 2012
@@ -3467,6 +3467,10 @@ public class HRegion implements HeapSize
return maxResultSize;
}
+ @Override
+ public long getMvccReadPoint() {
+ return this.readPt;
+ }
/**
* Reset both the filter and the old filter.
*/
@@ -3477,7 +3481,7 @@ public class HRegion implements HeapSize
}
@Override
- public synchronized boolean next(List<KeyValue> outResults, int limit)
+ public boolean next(List<KeyValue> outResults, int limit)
throws IOException {
return next(outResults, limit, null);
}
@@ -3497,30 +3501,42 @@ public class HRegion implements HeapSize
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
- results.clear();
-
- boolean returnResult = nextInternal(limit, metric);
-
- outResults.addAll(results);
- resetFilters();
- if (isFilterDone()) {
- return false;
- }
- return returnResult;
+ return nextRaw(outResults, limit, metric);
} finally {
closeRegionOperation();
}
}
@Override
- public synchronized boolean next(List<KeyValue> outResults)
+ public boolean nextRaw(List<KeyValue> outResults)
+ throws IOException {
+ return nextRaw(outResults, batch, null);
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> outResults, int limit,
+ String metric) throws IOException {
+ results.clear();
+
+ boolean returnResult = nextInternal(limit, metric);
+
+ outResults.addAll(results);
+ resetFilters();
+ if (isFilterDone()) {
+ return false;
+ }
+ return returnResult;
+ }
+
+ @Override
+ public boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, null);
}
@Override
- public synchronized boolean next(List<KeyValue> outResults, String metric)
+ public boolean next(List<KeyValue> outResults, String metric)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, metric);
@@ -5274,7 +5290,7 @@ public class HRegion implements HeapSize
* @throws RegionTooBusyException if failed to get the lock in time
* @throws InterruptedIOException if interrupted while waiting for a lock
*/
- private void startRegionOperation()
+ public void startRegionOperation()
throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
if (this.closing.get()) {
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5292,7 +5308,7 @@ public class HRegion implements HeapSize
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation
*/
- private void closeRegionOperation() {
+ public void closeRegionOperation() {
lock.readLock().unlock();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Dec 11 05:46:27 2012
@@ -2912,20 +2912,30 @@ public class HRegionServer implements C
maxResultSize = maxScannerResultSize;
}
List<KeyValue> values = new ArrayList<KeyValue>();
- for (int i = 0; i < rows
- && currentScanResultSize < maxResultSize; i++) {
- // Collect values to be returned here
- boolean moreRows = scanner.next(values);
- if (!values.isEmpty()) {
- for (KeyValue kv : values) {
- currentScanResultSize += kv.heapSize();
+ MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+ region.startRegionOperation();
+ try {
+ int i = 0;
+ synchronized(scanner) {
+ for (; i < rows
+ && currentScanResultSize < maxResultSize; i++) {
+ // Collect values to be returned here
+ boolean moreRows = scanner.nextRaw(values);
+ if (!values.isEmpty()) {
+ for (KeyValue kv : values) {
+ currentScanResultSize += kv.heapSize();
+ }
+ results.add(new Result(values));
+ }
+ if (!moreRows) {
+ break;
+ }
+ values.clear();
}
- results.add(new Result(values));
}
- if (!moreRows) {
- break;
- }
- values.clear();
+ region.readRequestsCount.add(i);
+ } finally {
+ region.closeRegionOperation();
}
// coprocessor postNext hook
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Tue Dec 11 05:46:27 2012
@@ -19,9 +19,11 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
/**
@@ -56,4 +58,50 @@ public interface RegionScanner extends I
* @return The preferred max buffersize. See {@link Scan#setMaxResultSize(long)}
*/
public long getMaxResultSize();
+
+ /**
+ * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+ */
+ public long getMvccReadPoint();
+
+ /**
+ * Grab the next row's worth of values with the default limit on the number of values
+ * to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+ * See {@link #nextRaw(List, int, String)}
+ * @param result return output array
+ * @return true if more rows exist after this one, false if scanner is done
+ * @throws IOException e
+ */
+ public boolean nextRaw(List<KeyValue> result) throws IOException;
+
+ /**
+ * Grab the next row's worth of values with a limit on the number of values
+ * to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+ * Example:
+ * <code><pre>
+ * HRegion region = ...;
+ * RegionScanner scanner = ...
+ * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+ * region.startRegionOperation();
+ * try {
+ * synchronized(scanner) {
+ * ...
+ * boolean moreRows = scanner.nextRaw(values);
+ * ...
+ * }
+ * } finally {
+ * region.closeRegionOperation();
+ * }
+ * </pre></code>
+ * @param result return output array
+ * @param limit limit on row count to get
+ * @param metric the metric name
+ * @return true if more rows exist after this one, false if scanner is done
+ * @throws IOException e
+ */
+ public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Tue Dec 11 05:46:27 2012
@@ -99,6 +99,18 @@ public class TestCoprocessorInterface ex
}
@Override
+ public boolean nextRaw(List<KeyValue> result, int limit, String metric)
+ throws IOException {
+ return delegate.nextRaw(result, limit, metric);
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> result)
+ throws IOException {
+ return delegate.nextRaw(result);
+ }
+
+ @Override
public void close() throws IOException {
delegate.close();
}
@@ -123,6 +135,10 @@ public class TestCoprocessorInterface ex
return delegate.getMaxResultSize();
}
+ @Override
+ public long getMvccReadPoint() {
+ return delegate.getMvccReadPoint();
+ }
}
public static class CoprocessorImpl extends BaseRegionObserver {