You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/11 06:46:47 UTC

svn commit: r1420005 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/

Author: larsh
Date: Tue Dec 11 05:46:45 2012
New Revision: 1420005

URL: http://svn.apache.org/viewvc?rev=1420005&view=rev
Log:
HBASE-7180 RegionScannerImpl.next() is inefficient.

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Dec 11 05:46:45 2012
@@ -3490,6 +3490,10 @@ public class HRegion implements HeapSize
       this(scan, null);
     }
 
+    @Override
+    public long getMvccReadPoint() {
+      return this.readPt;
+    }
     /**
      * Reset both the filter and the old filter.
      */
@@ -3500,7 +3504,7 @@ public class HRegion implements HeapSize
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults, int limit)
+    public boolean next(List<KeyValue> outResults, int limit)
         throws IOException {
       return next(outResults, limit, null);
     }
@@ -3520,30 +3524,42 @@ public class HRegion implements HeapSize
         // This could be a new thread from the last time we called next().
         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
 
-        results.clear();
-
-        boolean returnResult = nextInternal(limit, metric);
-
-        outResults.addAll(results);
-        resetFilters();
-        if (isFilterDone()) {
-          return false;
-        }
-        return returnResult;
+        return nextRaw(outResults, limit, metric);
       } finally {
         closeRegionOperation();
       }
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults)
+    public boolean nextRaw(List<KeyValue> outResults, String metric)
+        throws IOException {
+      return nextRaw(outResults, batch, metric);
+    }
+
+    @Override
+    public boolean nextRaw(List<KeyValue> outResults, int limit,
+        String metric) throws IOException {
+      results.clear();
+
+      boolean returnResult = nextInternal(limit, metric);
+
+      outResults.addAll(results);
+      resetFilters();
+      if (isFilterDone()) {
+        return false;
+      }
+      return returnResult;
+    }
+
+    @Override
+    public boolean next(List<KeyValue> outResults)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, null);
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults, String metric)
+    public boolean next(List<KeyValue> outResults, String metric)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, metric);
@@ -5245,7 +5261,7 @@ public class HRegion implements HeapSize
    * @throws RegionTooBusyException if failed to get the lock in time
    * @throws InterruptedIOException if interrupted while waiting for a lock
    */
-  private void startRegionOperation()
+  public void startRegionOperation()
       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
     if (this.closing.get()) {
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5263,7 +5279,7 @@ public class HRegion implements HeapSize
    * Closes the lock. This needs to be called in the finally block corresponding
    * to the try block of #startRegionOperation
    */
-  private void closeRegionOperation(){
+  public void closeRegionOperation(){
     lock.readLock().unlock();
   }
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Dec 11 05:46:45 2012
@@ -165,6 +165,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.field.MillisDurationField;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
@@ -2430,23 +2431,32 @@ public class HRegionServer implements HR
         }
       }
 
-      for (int i = 0; i < nbRows
-          && currentScanResultSize < maxScannerResultSize; i++) {
-        requestCount.incrementAndGet();
-        // Collect values to be returned here
-        boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
-        if (!values.isEmpty()) {
-          for (KeyValue kv : values) {
-            currentScanResultSize += kv.heapSize();
+      MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
+      region.startRegionOperation();
+      try {
+        int i = 0;
+        synchronized(s) {
+          for (; i < nbRows
+              && currentScanResultSize < maxScannerResultSize; i++) {
+            // Collect values to be returned here
+            boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE);
+            if (!values.isEmpty()) {
+              for (KeyValue kv : values) {
+                currentScanResultSize += kv.heapSize();
+              }
+              results.add(new Result(values));
+            }
+            if (!moreRows) {
+              break;
+            }
+            values.clear();
           }
-          results.add(new Result(values));
         }
-        if (!moreRows) {
-          break;
-        }
-        values.clear();
+        requestCount.addAndGet(i);
+        region.readRequestsCount.add(i);
+      } finally {
+        region.closeRegionOperation();
       }
-
       // coprocessor postNext hook
       if (region != null && region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Tue Dec 11 05:46:45 2012
@@ -20,7 +20,10 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
 
 /**
  * RegionScanner describes iterators over rows in an HRegion.
@@ -49,4 +52,50 @@ public interface RegionScanner extends I
    */
   public boolean reseek(byte[] row) throws IOException;
 
+  /**
+   * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+   */
+  public long getMvccReadPoint();
+
+  /**
+   * Grab the next row's worth of values with the default limit on the number of values
+   * to return.
+   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+   * See {@link #nextRaw(List, int, String)}
+   * @param result return output array
+   * @param metric the metric name
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException e
+   */
+  public boolean nextRaw(List<KeyValue> result, String metric) throws IOException;
+
+  /**
+   * Grab the next row's worth of values with a limit on the number of values
+   * to return.
+   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+   * Example:
+   * <code><pre>
+   * HRegion region = ...;
+   * RegionScanner scanner = ...
+   * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+   * region.startRegionOperation();
+   * try {
+   *   synchronized(scanner) {
+   *     ...
+   *     boolean moreRows = scanner.nextRaw(values);
+   *     ...
+   *   }
+   * } finally {
+   *   region.closeRegionOperation();
+   * }
+   * </pre></code>
+   * @param result return output array
+   * @param limit limit on row count to get
+   * @param metric the metric name
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException e
+   */
+  public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
 }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1420005&r1=1420004&r2=1420005&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Tue Dec 11 05:46:45 2012
@@ -85,6 +85,18 @@ public class TestCoprocessorInterface ex
     }
 
     @Override
+    public boolean nextRaw(List<KeyValue> result, int limit, String metric) 
+        throws IOException {
+      return delegate.nextRaw(result, limit, metric);
+    }
+
+    @Override
+    public boolean nextRaw(List<KeyValue> result, String metric) 
+        throws IOException {
+      return delegate.nextRaw(result, metric);
+    }
+
+    @Override
     public void close() throws IOException {
       delegate.close();
     }
@@ -104,6 +116,10 @@ public class TestCoprocessorInterface ex
       return false;
     }
 
+    @Override
+    public long getMvccReadPoint() {
+      return delegate.getMvccReadPoint();
+    }
   }
 
   public static class CoprocessorImpl extends BaseRegionObserver {