You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/11 06:46:47 UTC
svn commit: r1420005 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/coprocessor/
Author: larsh
Date: Tue Dec 11 05:46:45 2012
New Revision: 1420005
URL: http://svn.apache.org/viewvc?rev=1420005&view=rev
Log:
HBASE-7180 RegionScannerImpl.next() is inefficient.
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Dec 11 05:46:45 2012
@@ -3490,6 +3490,10 @@ public class HRegion implements HeapSize
this(scan, null);
}
+ @Override
+ public long getMvccReadPoint() {
+ return this.readPt;
+ }
/**
* Reset both the filter and the old filter.
*/
@@ -3500,7 +3504,7 @@ public class HRegion implements HeapSize
}
@Override
- public synchronized boolean next(List<KeyValue> outResults, int limit)
+ public boolean next(List<KeyValue> outResults, int limit)
throws IOException {
return next(outResults, limit, null);
}
@@ -3520,30 +3524,42 @@ public class HRegion implements HeapSize
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
- results.clear();
-
- boolean returnResult = nextInternal(limit, metric);
-
- outResults.addAll(results);
- resetFilters();
- if (isFilterDone()) {
- return false;
- }
- return returnResult;
+ return nextRaw(outResults, limit, metric);
} finally {
closeRegionOperation();
}
}
@Override
- public synchronized boolean next(List<KeyValue> outResults)
+ public boolean nextRaw(List<KeyValue> outResults, String metric)
+ throws IOException {
+ return nextRaw(outResults, batch, metric);
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> outResults, int limit,
+ String metric) throws IOException {
+ results.clear();
+
+ boolean returnResult = nextInternal(limit, metric);
+
+ outResults.addAll(results);
+ resetFilters();
+ if (isFilterDone()) {
+ return false;
+ }
+ return returnResult;
+ }
+
+ @Override
+ public boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, null);
}
@Override
- public synchronized boolean next(List<KeyValue> outResults, String metric)
+ public boolean next(List<KeyValue> outResults, String metric)
throws IOException {
// apply the batching limit by default
return next(outResults, batch, metric);
@@ -5245,7 +5261,7 @@ public class HRegion implements HeapSize
* @throws RegionTooBusyException if failed to get the lock in time
* @throws InterruptedIOException if interrupted while waiting for a lock
*/
- private void startRegionOperation()
+ public void startRegionOperation()
throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
if (this.closing.get()) {
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5263,7 +5279,7 @@ public class HRegion implements HeapSize
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation
*/
- private void closeRegionOperation(){
+ public void closeRegionOperation(){
lock.readLock().unlock();
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Dec 11 05:46:45 2012
@@ -165,6 +165,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.field.MillisDurationField;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -2430,23 +2431,32 @@ public class HRegionServer implements HR
}
}
- for (int i = 0; i < nbRows
- && currentScanResultSize < maxScannerResultSize; i++) {
- requestCount.incrementAndGet();
- // Collect values to be returned here
- boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
- if (!values.isEmpty()) {
- for (KeyValue kv : values) {
- currentScanResultSize += kv.heapSize();
+ MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
+ region.startRegionOperation();
+ try {
+ int i = 0;
+ synchronized(s) {
+ for (; i < nbRows
+ && currentScanResultSize < maxScannerResultSize; i++) {
+ // Collect values to be returned here
+ boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE);
+ if (!values.isEmpty()) {
+ for (KeyValue kv : values) {
+ currentScanResultSize += kv.heapSize();
+ }
+ results.add(new Result(values));
+ }
+ if (!moreRows) {
+ break;
+ }
+ values.clear();
}
- results.add(new Result(values));
}
- if (!moreRows) {
- break;
- }
- values.clear();
+ requestCount.addAndGet(i);
+ region.readRequestsCount.add(i);
+ } finally {
+ region.closeRegionOperation();
}
-
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Tue Dec 11 05:46:45 2012
@@ -20,7 +20,10 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
/**
* RegionScanner describes iterators over rows in an HRegion.
@@ -49,4 +52,50 @@ public interface RegionScanner extends I
*/
public boolean reseek(byte[] row) throws IOException;
+ /**
+ * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+ */
+ public long getMvccReadPoint();
+
+ /**
+ * Grab the next row's worth of values with the default limit on the number of values
+ * to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+ * See {@link #nextRaw(List, int, String)}
+ * @param result return output array
+ * @param metric the metric name
+ * @return true if more rows exist after this one, false if scanner is done
+ * @throws IOException e
+ */
+ public boolean nextRaw(List<KeyValue> result, String metric) throws IOException;
+
+ /**
+ * Grab the next row's worth of values with a limit on the number of values
+ * to return.
+ * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+ * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+ * Example:
+ * <code><pre>
+ * HRegion region = ...;
+ * RegionScanner scanner = ...
+ * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+ * region.startRegionOperation();
+ * try {
+ * synchronized(scanner) {
+ * ...
+ * boolean moreRows = scanner.nextRaw(values);
+ * ...
+ * }
+ * } finally {
+ * region.closeRegionOperation();
+ * }
+ * </pre></code>
+ * @param result return output array
+ * @param limit limit on row count to get
+ * @param metric the metric name
+ * @return true if more rows exist after this one, false if scanner is done
+ * @throws IOException e
+ */
+ public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Tue Dec 11 05:46:45 2012
@@ -85,6 +85,18 @@ public class TestCoprocessorInterface ex
}
@Override
+ public boolean nextRaw(List<KeyValue> result, int limit, String metric)
+ throws IOException {
+ return delegate.nextRaw(result, limit, metric);
+ }
+
+ @Override
+ public boolean nextRaw(List<KeyValue> result, String metric)
+ throws IOException {
+ return delegate.nextRaw(result, metric);
+ }
+
+ @Override
public void close() throws IOException {
delegate.close();
}
@@ -104,6 +116,10 @@ public class TestCoprocessorInterface ex
return false;
}
+ @Override
+ public long getMvccReadPoint() {
+ return delegate.getMvccReadPoint();
+ }
}
public static class CoprocessorImpl extends BaseRegionObserver {