You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/11 06:46:29 UTC

svn commit: r1420004 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/

Author: larsh
Date: Tue Dec 11 05:46:27 2012
New Revision: 1420004

URL: http://svn.apache.org/viewvc?rev=1420004&view=rev
Log:
HBASE-7180 RegionScannerImpl.next() is inefficient.

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Dec 11 05:46:27 2012
@@ -3467,6 +3467,10 @@ public class HRegion implements HeapSize
       return maxResultSize;
     }
 
+    @Override
+    public long getMvccReadPoint() {
+      return this.readPt;
+    }
     /**
      * Reset both the filter and the old filter.
      */
@@ -3477,7 +3481,7 @@ public class HRegion implements HeapSize
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults, int limit)
+    public boolean next(List<KeyValue> outResults, int limit)
         throws IOException {
       return next(outResults, limit, null);
     }
@@ -3497,30 +3501,42 @@ public class HRegion implements HeapSize
         // This could be a new thread from the last time we called next().
         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
 
-        results.clear();
-
-        boolean returnResult = nextInternal(limit, metric);
-
-        outResults.addAll(results);
-        resetFilters();
-        if (isFilterDone()) {
-          return false;
-        }
-        return returnResult;
+        return nextRaw(outResults, limit, metric);
       } finally {
         closeRegionOperation();
       }
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults)
+    public boolean nextRaw(List<KeyValue> outResults)
+        throws IOException {
+      return nextRaw(outResults, batch, null);
+    }
+
+    @Override
+    public boolean nextRaw(List<KeyValue> outResults, int limit,
+        String metric) throws IOException {
+      results.clear();
+
+      boolean returnResult = nextInternal(limit, metric);
+
+      outResults.addAll(results);
+      resetFilters();
+      if (isFilterDone()) {
+        return false;
+      }
+      return returnResult;
+    }
+
+    @Override
+    public boolean next(List<KeyValue> outResults)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, null);
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults, String metric)
+    public boolean next(List<KeyValue> outResults, String metric)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, metric);
@@ -5274,7 +5290,7 @@ public class HRegion implements HeapSize
    * @throws RegionTooBusyException if failed to get the lock in time
    * @throws InterruptedIOException if interrupted while waiting for a lock
    */
-  private void startRegionOperation()
+  public void startRegionOperation()
       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
     if (this.closing.get()) {
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5292,7 +5308,7 @@ public class HRegion implements HeapSize
    * Closes the lock. This needs to be called in the finally block corresponding
    * to the try block of #startRegionOperation
    */
-  private void closeRegionOperation() {
+  public void closeRegionOperation() {
     lock.readLock().unlock();
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Dec 11 05:46:27 2012
@@ -2912,20 +2912,30 @@ public class  HRegionServer implements C
                 maxResultSize = maxScannerResultSize;
               }
               List<KeyValue> values = new ArrayList<KeyValue>();
-              for (int i = 0; i < rows
-                  && currentScanResultSize < maxResultSize; i++) {
-                // Collect values to be returned here
-                boolean moreRows = scanner.next(values);
-                if (!values.isEmpty()) {
-                  for (KeyValue kv : values) {
-                    currentScanResultSize += kv.heapSize();
+              MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+              region.startRegionOperation();
+              try {
+                int i = 0;
+                synchronized(scanner) {
+                  for (; i < rows
+                      && currentScanResultSize < maxResultSize; i++) {
+                    // Collect values to be returned here
+                    boolean moreRows = scanner.nextRaw(values);
+                    if (!values.isEmpty()) {
+                      for (KeyValue kv : values) {
+                        currentScanResultSize += kv.heapSize();
+                      }
+                      results.add(new Result(values));
+                    }
+                    if (!moreRows) {
+                      break;
+                    }
+                    values.clear();
                   }
-                  results.add(new Result(values));
                 }
-                if (!moreRows) {
-                  break;
-                }
-                values.clear();
+                region.readRequestsCount.add(i);
+              } finally {
+                region.closeRegionOperation();
               }
 
               // coprocessor postNext hook

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Tue Dec 11 05:46:27 2012
@@ -19,9 +19,11 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 
 /**
@@ -56,4 +58,50 @@ public interface RegionScanner extends I
    * @return The preferred max buffersize. See {@link Scan#setMaxResultSize(long)}
    */
   public long getMaxResultSize();
+
+  /**
+   * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+   */
+  public long getMvccReadPoint();
+
+  /**
+   * Grab the next row's worth of values with the default limit on the number of values
+   * to return.
+   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+   * See {@link #nextRaw(List, int, String)}
+   * @param result return output array
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException e
+   */
+  public boolean nextRaw(List<KeyValue> result) throws IOException;
+
+  /**
+   * Grab the next row's worth of values with a limit on the number of values
+   * to return.
+   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+   * Example:
+   * <code><pre>
+   * HRegion region = ...;
+   * RegionScanner scanner = ...
+   * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+   * region.startRegionOperation();
+   * try {
+   *   synchronized(scanner) {
+   *     ...
+   *     boolean moreRows = scanner.nextRaw(values);
+   *     ...
+   *   }
+   * } finally {
+   *   region.closeRegionOperation();
+   * }
+   * </pre></code>
+   * @param result return output array
+   * @param limit limit on row count to get
+   * @param metric the metric name
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException e
+   */
+  public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1420004&r1=1420003&r2=1420004&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Tue Dec 11 05:46:27 2012
@@ -99,6 +99,18 @@ public class TestCoprocessorInterface ex
     }
 
     @Override
+    public boolean nextRaw(List<KeyValue> result, int limit, String metric) 
+        throws IOException {
+      return delegate.nextRaw(result, limit, metric);
+    }
+
+    @Override
+    public boolean nextRaw(List<KeyValue> result) 
+        throws IOException {
+      return delegate.nextRaw(result);
+    }
+
+    @Override
     public void close() throws IOException {
       delegate.close();
     }
@@ -123,6 +135,10 @@ public class TestCoprocessorInterface ex
       return delegate.getMaxResultSize();
     }
 
+    @Override
+    public long getMvccReadPoint() {
+      return delegate.getMvccReadPoint();
+    }
   }
 
   public static class CoprocessorImpl extends BaseRegionObserver {