You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/03/23 19:18:39 UTC

svn commit: r1460199 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Sat Mar 23 18:18:39 2013
New Revision: 1460199

URL: http://svn.apache.org/r1460199
Log:
[HBASE-8185] Pulling HRegionScanner code outside the HRegion class.

Author: manukranthk

Summary: This diff is to decompose the diff D708638 into few smaller diffs for a quicker and easier review process.

Test Plan: Since this is a refactoring change, testing the code change by running MR Unit tests.

Reviewers: liyintang, rshroff, aaiyer

Reviewed By: liyintang

CC: hbase-eng@, erling, arice

Differential Revision: https://phabricator.fb.com/D743249

Task ID: 2103689

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Mar 23 18:18:39 2013
@@ -91,8 +91,6 @@ import org.apache.hadoop.hbase.ipc.HRegi
 import org.apache.hadoop.hbase.metrics.RequestMetrics;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanPrefetcher;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanResult;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -587,8 +585,9 @@ public class HRegion implements HeapSize
       if (!families.isEmpty()) {
         // initialize the thread pool for opening stores in parallel.
         ThreadPoolExecutor storeOpenerThreadPool =
-          getStoreOpenAndCloseThreadPool(
-            "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
+          StoreThreadUtils.getStoreOpenAndCloseThreadPool(
+            "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString(),
+            this.getRegionInfo(), this.conf);
         CompletionService<Store> completionService =
           new ExecutorCompletionService<Store>(storeOpenerThreadPool);
 
@@ -857,8 +856,8 @@ public class HRegion implements HeapSize
           if (!stores.isEmpty()) {
             // initialize the thread pool for closing stores in parallel.
             ThreadPoolExecutor storeCloserThreadPool =
-              getStoreOpenAndCloseThreadPool("StoreCloserThread-"
-                + this.regionInfo.getRegionNameAsString());
+              StoreThreadUtils.getStoreOpenAndCloseThreadPool("StoreCloserThread-"
+                + this.regionInfo.getRegionNameAsString(), this.getRegionInfo(), this.conf);
             CompletionService<ImmutableList<StoreFile>> completionService =
               new ExecutorCompletionService<ImmutableList<StoreFile>>(
                 storeCloserThreadPool);
@@ -902,41 +901,6 @@ public class HRegion implements HeapSize
     }
   }
 
-  protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
-      final String threadNamePrefix) {
-    int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size());
-    int maxThreads = Math.min(numStores,
-        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
-            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
-    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
-  }
-
-  protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
-      final String threadNamePrefix) {
-    int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size());
-    int maxThreads = Math.max(1,
-        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
-            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
-            / numStores);
-    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
-  }
-
-  private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
-      final String threadNamePrefix) {
-    ThreadPoolExecutor openAndCloseThreadPool = Threads
-        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
-            new ThreadFactory() {
-              private int count = 1;
-
-              public Thread newThread(Runnable r) {
-                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
-                t.setDaemon(true);
-                return t;
-              }
-            });
-    return openAndCloseThreadPool;
-  }
-
    /**
     * @return True if its worth doing a flush before we put up the close flag.
     */
@@ -1686,7 +1650,10 @@ public class HRegion implements HeapSize
   }
 
   protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
-    return new RegionScanner(scan, additionalScanners);
+    // return new RegionScanner(scan, additionalScanners);
+    RegionContext regionContext = new RegionContext(stores, scannerReadPoints,
+        comparator, mvcc, closing, closed, regionInfo, rowReadCnt);
+    return new RegionScanner(scan, additionalScanners, regionContext);
   }
 
   /*
@@ -2997,408 +2964,6 @@ public class HRegion implements HeapSize
     return this.tableDir;
   }
 
-  /**
-   * RegionScanner is an iterator through a bunch of rows in an HRegion.
-   * <p>
-   * It is used to combine scanners from multiple Stores (aka column families).
-   */
-  class RegionScanner implements InternalScanner {
-    // Package local for testability
-    KeyValueHeap storeHeap = null;
-    private final byte [] stopRow;
-    private Filter filter;
-    private final int batch;
-    private int isScan;
-    private boolean filterClosed = false;
-    private long readPt;
-    private Scan originalScan;
-    private Future<ScanResult> prefetchScanFuture = null;
-
-    RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
-      //DebugPrint.println("HRegionScanner.<init>");
-
-      this.originalScan = scan;
-
-      this.filter = scan.getFilter();
-      this.batch = scan.getBatch();
-      if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
-        this.stopRow = null;
-      } else {
-        this.stopRow = scan.getStopRow();
-      }
-      // If we are doing a get, we want to be [startRow,endRow] normally
-      // it is [startRow,endRow) and if startRow=endRow we get nothing.
-      this.isScan = scan.isGetScan() ? -1 : 0;
-
-      // synchronize on scannerReadPoints so that nobody calculates
-      // getSmallestReadPoint, before scannerReadPoints is updated.
-      synchronized(scannerReadPoints) {
-        this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
-        scannerReadPoints.put(this, this.readPt);
-      }
-
-      List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
-      if (additionalScanners != null) {
-        scanners.addAll(additionalScanners);
-      }
-
-      for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
-          scan.getFamilyMap().entrySet()) {
-        Store store = stores.get(entry.getKey());
-        StoreScanner scanner = store.getScanner(scan, entry.getValue());
-        scanners.add(scanner);
-      }
-      this.storeHeap = new KeyValueHeap(scanners, comparator);
-    }
-
-    RegionScanner(Scan scan) throws IOException {
-      this(scan, null);
-    }
-
-    /**
-     * Reset both the filter and the old filter.
-     */
-    protected void resetFilters() {
-      if (filter != null) {
-        filter.reset();
-      }
-    }
-
-    @Override
-    public boolean next(List<KeyValue> outResults, int limit)
-        throws IOException {
-      return next(outResults, limit, null);
-    }
-
-    private void preCondition() throws IOException{
-      if (this.filterClosed) {
-        throw new UnknownScannerException("Scanner was closed (timed out?) " +
-            "after we renewed it. Could be caused by a very slow scanner " +
-            "or a lengthy garbage collection");
-      }
-      if (closing.get() || closed.get()) {
-        close();
-        throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
-          " is closing=" + closing.get() + " or closed=" + closed.get());
-      }
-
-      // This could be a new thread from the last time we called next().
-      MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
-    }
-
-    /**
-     * This class abstracts the results of a single scanner's result. It tracks
-     * the list of Result objects if the pre-fetch next was successful, and
-     * tracks the exception if the next failed.
-     */
-    class ScanResult {
-      final boolean isException;
-      IOException ioException = null;
-      Result[] outResults;
-      boolean moreRows;
-
-      public ScanResult(IOException ioException) {
-        isException = true;
-        this.ioException = ioException;
-      }
-
-      public ScanResult(boolean moreRows, Result[] outResults) {
-        isException = false;
-        this.moreRows = moreRows;
-        this.outResults = outResults;
-      }
-    }
-
-    /**
-     * This Callable abstracts calling a pre-fetch next. This is called on a
-     * threadpool. It makes a pre-fetch next call with the same parameters as
-     * the incoming next call. Note that the number of rows to return (nbRows)
-     * and/or the memory size for the result is the same as the previous call if
-     * pre-fetching is enabled. If these params change dynamically, they will
-     * take effect in the subsequent iteration.
-     */
-    class ScanPrefetcher implements Callable<ScanResult> {
-      int nbRows;
-      int limit;
-      String metric;
-
-      ScanPrefetcher(int nbRows, int limit, String metric) {
-        this.nbRows = nbRows;
-        this.limit = limit;
-        this.metric = metric;
-      }
-
-      public ScanResult call() {
-        ScanResult scanResult = null;
-        List<Result> outResults = new ArrayList<Result>();
-        List<KeyValue> tmpList = new ArrayList<KeyValue>();
-        int currentNbRows = 0;
-        boolean moreRows = true;
-        try {
-          // This is necessary b/c partialResponseSize is not serialized through
-          // RPC
-          getOriginalScan().setCurrentPartialResponseSize(0);
-          int maxResponseSize = getOriginalScan().getMaxResponseSize();
-          do {
-            moreRows = nextInternal(tmpList, limit, metric);
-            if (!tmpList.isEmpty()) {
-              currentNbRows++;
-              if (outResults != null) {
-                outResults.add(new Result(tmpList));
-                tmpList.clear();
-              }
-            }
-            resetFilters();
-            if (isFilterDone()) {
-              break;
-            }
-
-            // While Condition
-            // 1. respect maxResponseSize and nbRows whichever comes first,
-            // 2. recheck the currentPartialResponseSize is to catch the case
-            // where maxResponseSize is saturated and partialRow == false
-            // since we allow this case valid in the nextInternal() layer
-          } while (moreRows
-              && (getOriginalScan().getCurrentPartialResponseSize() < 
-                  maxResponseSize && currentNbRows < nbRows));
-          scanResult = new ScanResult(moreRows, 
-              outResults.toArray(new Result[0]));
-        } catch (IOException e) {
-          // we should queue the exception as the result so that we can return
-          // this when the result is asked for
-          scanResult = new ScanResult(e);
-        }
-        return scanResult;
-      }
-    }
-
-    /**
-     * A method to return all the rows that can fit in the response size.
-     * it respects the two stop conditions:
-     * 1) scan.getMaxResponseSize
-     * 2) scan.getCaching() (which is nbRows) 
-     * the loop breaks whoever comes first.
-     * This is only used by scan(), not get()
-     * @param outResults a list of rows to return 
-     * @param nbRows the number of rows that can be returned at most
-     * @param metric the metric name 
-     * @return true if there are more rows to fetch.
-     *
-     * This is used by Scans.
-     */
-    public synchronized Result[] nextRows(int nbRows, String metric) 
-    throws IOException {
-      preCondition();
-      boolean prefetchingEnabled = getOriginalScan().getServerPrefetching();
-      int limit = this.getOriginalScan().getBatch();
-      ScanResult scanResult;
-      // if we have a prefetched result, then use it
-      if (prefetchingEnabled && prefetchScanFuture != null) {
-        try {
-          scanResult = prefetchScanFuture.get();
-          prefetchScanFuture = null;
-        } catch (InterruptedException e) {
-          throw new IOException(e);
-        } catch (ExecutionException e) {
-          throw new IOException(e);
-        }
-        if (scanResult.isException) {
-          throw scanResult.ioException;
-        }
-      }
-      // if there are no prefetched results, then preform the scan inline
-      else {
-        ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric);
-        scanResult = scanFetch.call();
-      }
-
-      if (scanResult.isException) {
-        throw scanResult.ioException;
-      }
-
-      // schedule a background prefetch for the next result if prefetch is
-      // enabled on scans
-      boolean scanDone = 
-        (scanResult.outResults == null || scanResult.outResults.length == 0);
-      if (prefetchingEnabled && !scanDone) {
-        ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric);
-        prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable);
-      }
-      if (!scanDone) {
-        rowReadCnt.addAndGet(scanResult.outResults.length);
-      }
-      return scanResult.outResults == null || 
-          (isFilterDone() && scanResult.outResults.length == 0) ?
-          null : scanResult.outResults;
-    }
-    
-    /**
-     * This is used by Gets & unit tests, whereas nextRows() is
-     * used by Scans
-     */
-    @Override
-    public synchronized boolean next(List<KeyValue> outResults, int limit,
-        String metric) throws IOException {
-      preCondition();
-      boolean returnResult;
-      if (outResults.isEmpty()) {
-         // Usually outResults is empty. This is true when next is called
-         // to handle scan or get operation.
-        returnResult = nextInternal(outResults, limit, metric);
-      } else {
-        List<KeyValue> tmpList = new ArrayList<KeyValue>();
-        returnResult = nextInternal(tmpList, limit, metric);
-        outResults.addAll(tmpList);
-      }
-      rowReadCnt.incrementAndGet();
-      resetFilters();
-      if (isFilterDone()) {
-        return false;
-      }
-      return returnResult;
-    }
-
-    @Override
-    public boolean next(List<KeyValue> outResults)
-        throws IOException {
-      // apply the batching limit by default
-      return next(outResults, batch, null);
-    }
-
-    @Override
-    public boolean next(List<KeyValue> outResults, String metric)
-        throws IOException {
-      // apply the batching limit by default
-      return next(outResults, batch, metric);
-    }
-
-    /*
-     * @return True if a filter rules the scanner is over, done.
-     */
-    private boolean isFilterDone() {
-      return this.filter != null && this.filter.filterAllRemaining();
-    }
-
-    /**
-     * @param results empty list in which results will be stored
-     */
-    private boolean nextInternal(List<KeyValue> results, int limit, String metric)
-        throws IOException {
-
-      if (!results.isEmpty()) {
-        throw new IllegalArgumentException("First parameter should be an empty list");
-      }
-
-      boolean partialRow = getOriginalScan().isPartialRow();
-      long maxResponseSize = getOriginalScan().getMaxResponseSize();
-      
-      while (true) {
-        byte [] currentRow = peekRow();
-        if (isStopRow(currentRow)) {
-          if (filter != null && filter.hasFilterRow()) {
-            filter.filterRow(results);
-          }
-          if (filter != null && filter.filterRow()) {
-            results.clear();
-          }
-
-          return false;
-        } else if (filterRowKey(currentRow)) {
-          nextRow(currentRow);
-          results.clear();
-        } else {
-          byte [] nextRow;
-          do {
-            this.storeHeap.next(results, limit - results.size(), metric);
-            if (limit > 0 && results.size() == limit) {
-              if (this.filter != null && filter.hasFilterRow()) 
-                throw new IncompatibleFilterException(
-                  "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
-              return true; // we are expecting more yes, but also limited to how many we can return.
-            }
-            // this gaurantees that we still complete the entire row if
-            // currentPartialResponseSize exceeds the maxResponseSize. 
-            if (partialRow && getOriginalScan().getCurrentPartialResponseSize()
-                 >= maxResponseSize) {
-              return true;
-            }
-          } while (Bytes.equals(currentRow, nextRow = peekRow()));
-
-          final boolean stopRow = isStopRow(nextRow);
-
-          // now that we have an entire row, lets process with a filters:
-
-          // first filter with the filterRow(List)
-          if (filter != null && filter.hasFilterRow()) {
-            filter.filterRow(results);
-          }
-
-          if (results.isEmpty() || filterRow()) {
-            nextRow(currentRow);
-            results.clear();
-
-            // This row was totally filtered out, if this is NOT the last row,
-            // we should continue on.
-
-            if (!stopRow) continue;
-          }
-          return !stopRow;
-        }
-      }
-    }
-
-    private boolean filterRow() {
-      return filter != null
-          && filter.filterRow();
-    }
-    private boolean filterRowKey(byte[] row) {
-      return filter != null
-          && filter.filterRowKey(row, 0, row.length);
-    }
-
-    protected void nextRow(byte [] currentRow) throws IOException {
-      while (Bytes.equals(currentRow, peekRow())) {
-        this.storeHeap.next(MOCKED_LIST);
-      }
-      resetFilters();
-    }
-
-    private byte[] peekRow() {
-      KeyValue kv = this.storeHeap.peek();
-      return kv == null ? null : kv.getRow();
-    }
-
-    private boolean isStopRow(byte [] currentRow) {
-      return currentRow == null ||
-          (stopRow != null &&
-          comparator.compareRows(stopRow, 0, stopRow.length,
-              currentRow, 0, currentRow.length) <= isScan);
-    }
-
-    @Override
-    public synchronized void close() {
-      if (storeHeap != null) {
-        storeHeap.close();
-        storeHeap = null;
-      }
-      // no need to sychronize here.
-      scannerReadPoints.remove(this);
-      this.filterClosed = true;
-    }
-
-    KeyValueHeap getStoreHeapForTesting() {
-      return storeHeap;
-    }
-
-    /**
-     * Get the original scan object that was used to create this internal one
-     * @return original scan object... used for debug output
-     */
-    public Scan getOriginalScan() {
-      return originalScan;
-    }
-  }
-
   // Utility methods
   /**
    * A utility method to create new instances of HRegion based on the
@@ -4283,7 +3848,7 @@ public class HRegion implements HeapSize
   /**
    * A mocked list implementaion - discards all updates.
    */
-  private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
+  public static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
 
     @Override
     public void add(int index, KeyValue element) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Mar 23 18:18:39 2013
@@ -2635,8 +2635,8 @@ public class HRegionServer implements HR
       long scannerId = (Long) params[0];
       String scannerName = String.valueOf(scannerId);
       InternalScanner s = regionServer.scanners.get(scannerName);
-      if (s != null && s instanceof HRegion.RegionScanner) {
-        res.put("scan", ((HRegion.RegionScanner)s).getOriginalScan().toMap(DEFAULT_MAX_COLS));
+      if (s != null && s instanceof RegionScanner) {
+        res.put("scan", ((RegionScanner)s).getOriginalScan().toMap(DEFAULT_MAX_COLS));
       }
 
       if (params.length > 1) {
@@ -2653,7 +2653,7 @@ public class HRegionServer implements HR
       String scannerName = String.valueOf(scannerId);
       // HRegionServer only deals with Region Scanner, 
       // thus, we just typecast directly
-      HRegion.RegionScanner s = (HRegion.RegionScanner)this.scanners.get(scannerName);
+      RegionScanner s = (RegionScanner)this.scanners.get(scannerName);
       if (s == null) {
         throw new UnknownScannerException("Name: " + scannerName);
       }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java?rev=1460199&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java Sat Mar 23 18:18:39 2013
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+/**
+ * RegionContext gives a container for all the member variables in the
+ * HRegion class which the RegionScanner needs in its constructor. Earlier,
+ * since the RegionScanner was an inner class to HRegion, these members were
+ * accessible. Now that the RegionScanner is an external class, these variables
+ * are packaged into RegionContext.
+ *
+ * @author manukranthk
+ *
+ */
+public class RegionContext {
+  final private Map<byte[], Store> stores;
+  final private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+  final private KeyValue.KVComparator comparator;
+  final private MultiVersionConsistencyControl mvcc;
+  final private AtomicBoolean closing;
+  final private AtomicBoolean closed;
+  final private HRegionInfo regionInfo;
+  final private AtomicInteger rowReadCnt;
+
+  public RegionContext(Map<byte[], Store> stores,
+      ConcurrentHashMap<RegionScanner, Long> scannerReadPoints,
+      KeyValue.KVComparator comparator,
+      MultiVersionConsistencyControl mvcc,
+      AtomicBoolean closing,
+      AtomicBoolean closed,
+      HRegionInfo regionInfo,
+      AtomicInteger rowReadCnt) {
+    this.stores = stores;
+    this.scannerReadPoints = scannerReadPoints;
+    this.comparator = comparator;
+    this.mvcc = mvcc;
+    this.closing = closing;
+    this.closed = closed;
+    this.regionInfo = regionInfo;
+    this.rowReadCnt = rowReadCnt;
+  }
+
+  /*
+   * Constructor to create a region context for
+   * opening a read only RegionScanner
+   */
+  public RegionContext(Map<byte[], Store> stores, HRegionInfo regionInfo) {
+    this.stores = stores;
+    this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
+    this.comparator = regionInfo.getComparator();
+    this.mvcc = new MultiVersionConsistencyControl();
+    this.mvcc.setThreadReadPoint(Long.MAX_VALUE);
+    this.closing = new AtomicBoolean(false);
+    this.closed = new AtomicBoolean(false);
+    this.regionInfo = regionInfo;
+    this.rowReadCnt = new AtomicInteger(0);
+  }
+
+  public Map<byte[], Store> getStores() {
+    return this.stores;
+  }
+
+  public ConcurrentHashMap<RegionScanner, Long> getScannerReadPoints() {
+    return this.scannerReadPoints;
+  }
+
+  public KeyValue.KVComparator getComparator() {
+    return this.comparator;
+  }
+
+  public MultiVersionConsistencyControl getmvcc() {
+    return this.mvcc;
+  }
+
+  public AtomicBoolean getClosing() {
+    return this.closing;
+  }
+
+  public AtomicBoolean getClosed() {
+    return this.closed;
+  }
+
+  public HRegionInfo getRegionInfo() {
+    return this.regionInfo;
+  }
+
+  public AtomicInteger getRowReadCnt() {
+    return this.rowReadCnt;
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1460199&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Sat Mar 23 18:18:39 2013
@@ -0,0 +1,449 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * RegionScanner is an iterator through a bunch of rows in an HRegion.
+ * <p>
+ * It is used to combine scanners from multiple Stores (aka column families).
+ */
+public class RegionScanner implements InternalScanner {
+//Package local for testability
+  KeyValueHeap storeHeap = null;
+  private final byte [] stopRow;
+  private Filter filter;
+  private final int batch;
+  private int isScan;
+  private boolean filterClosed = false;
+  private long readPt;
+  private Scan originalScan;
+  private Future<ScanResult> prefetchScanFuture = null;
+  private Map<byte[], Store> stores;
+  private KeyValue.KVComparator comparator;
+  private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+  private MultiVersionConsistencyControl mvcc;
+  private AtomicBoolean closing;
+  private AtomicBoolean closed;
+  private HRegionInfo regionInfo;
+  private AtomicInteger rowReadCnt;
+  private final List<KeyValue> MOCKED_LIST = HRegion.MOCKED_LIST;
+
+  public RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, RegionContext regionContext)
+    throws IOException {
+    this.stores = regionContext.getStores();
+    this.scannerReadPoints = regionContext.getScannerReadPoints();
+    this.comparator = regionContext.getComparator();
+    this.mvcc = regionContext.getmvcc();
+    this.closing = regionContext.getClosing();
+    this.closed = regionContext.getClosed();
+    this.regionInfo = regionContext.getRegionInfo();
+    this.rowReadCnt = regionContext.getRowReadCnt();
+    this.originalScan = scan;
+
+    this.filter = scan.getFilter();
+    this.batch = scan.getBatch();
+    if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
+      this.stopRow = null;
+    } else {
+      this.stopRow = scan.getStopRow();
+    }
+    // If we are doing a get, we want to be [startRow,endRow] normally
+    // it is [startRow,endRow) and if startRow=endRow we get nothing.
+    this.isScan = scan.isGetScan() ? -1 : 0;
+
+    // synchronize on scannerReadPoints so that nobody calculates
+    // getSmallestReadPoint, before scannerReadPoints is updated.
+    synchronized(scannerReadPoints) {
+      this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
+      scannerReadPoints.put(this, this.readPt);
+    }
+
+    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+    if (additionalScanners != null) {
+      scanners.addAll(additionalScanners);
+    }
+
+    for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
+        scan.getFamilyMap().entrySet()) {
+      Store store = stores.get(entry.getKey());
+      StoreScanner scanner = store.getScanner(scan, entry.getValue());
+      scanners.add(scanner);
+    }
+    this.storeHeap = new KeyValueHeap(scanners, comparator);
+  }
+
+  /**
+   * Reset both the filter and the old filter.
+   */
+  protected void resetFilters() {
+    if (filter != null) {
+      filter.reset();
+    }
+  }
+
+  @Override
+  public boolean next(List<KeyValue> outResults, int limit)
+      throws IOException {
+    return next(outResults, limit, null);
+  }
+
+  private void preCondition() throws IOException{
+    if (this.filterClosed) {
+      throw new UnknownScannerException("Scanner was closed (timed out?) " +
+          "after we renewed it. Could be caused by a very slow scanner " +
+          "or a lengthy garbage collection");
+    }
+    if (closing.get() || closed.get()) {
+      close();
+      throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+        " is closing=" + closing.get() + " or closed=" + closed.get());
+    }
+
+    // This could be a new thread from the last time we called next().
+    MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
+  }
+
+  /**
+   * This class abstracts the results of a single scanner's result. It tracks
+   * the list of Result objects if the pre-fetch next was successful, and
+   * tracks the exception if the next failed.
+   */
+  class ScanResult {
+    final boolean isException;
+    IOException ioException = null;
+    Result[] outResults;
+    boolean moreRows;
+
+    public ScanResult(IOException ioException) {
+      isException = true;
+      this.ioException = ioException;
+    }
+
+    public ScanResult(boolean moreRows, Result[] outResults) {
+      isException = false;
+      this.moreRows = moreRows;
+      this.outResults = outResults;
+    }
+  }
+
+  /**
+   * This Callable abstracts calling a pre-fetch next. This is called on a
+   * threadpool. It makes a pre-fetch next call with the same parameters as
+   * the incoming next call. Note that the number of rows to return (nbRows)
+   * and/or the memory size for the result is the same as the previous call if
+   * pre-fetching is enabled. If these params change dynamically, they will
+   * take effect in the subsequent iteration.
+   */
+  class ScanPrefetcher implements Callable<ScanResult> {
+    int nbRows;
+    int limit;
+    String metric;
+
+    ScanPrefetcher(int nbRows, int limit, String metric) {
+      this.nbRows = nbRows;
+      this.limit = limit;
+      this.metric = metric;
+    }
+
+    public ScanResult call() {
+      ScanResult scanResult = null;
+      List<Result> outResults = new ArrayList<Result>();
+      List<KeyValue> tmpList = new ArrayList<KeyValue>();
+      int currentNbRows = 0;
+      boolean moreRows = true;
+      try {
+        // This is necessary b/c partialResponseSize is not serialized through
+        // RPC
+        getOriginalScan().setCurrentPartialResponseSize(0);
+        int maxResponseSize = getOriginalScan().getMaxResponseSize();
+        do {
+          moreRows = nextInternal(tmpList, limit, metric);
+          if (!tmpList.isEmpty()) {
+            currentNbRows++;
+            if (outResults != null) {
+              outResults.add(new Result(tmpList));
+              tmpList.clear();
+            }
+          }
+          resetFilters();
+          if (isFilterDone()) {
+            break;
+          }
+
+          // While Condition
+          // 1. respect maxResponseSize and nbRows whichever comes first,
+          // 2. recheck the currentPartialResponseSize is to catch the case
+          // where maxResponseSize is saturated and partialRow == false
+          // since we allow this case valid in the nextInternal() layer
+        } while (moreRows
+            && (getOriginalScan().getCurrentPartialResponseSize() <
+                maxResponseSize && currentNbRows < nbRows));
+        scanResult = new ScanResult(moreRows,
+            outResults.toArray(new Result[0]));
+      } catch (IOException e) {
+        // we should queue the exception as the result so that we can return
+        // this when the result is asked for
+        scanResult = new ScanResult(e);
+      }
+      return scanResult;
+    }
+  }
+
+  /**
+   * A method to return all the rows that can fit in the response size.
+   * it respects the two stop conditions:
+   * 1) scan.getMaxResponseSize
+   * 2) scan.getCaching() (which is nbRows)
+   * the loop breaks whoever comes first.
+   * This is only used by scan(), not get()
+   * @param outResults a list of rows to return
+   * @param nbRows the number of rows that can be returned at most
+   * @param metric the metric name
+   * @return true if there are more rows to fetch.
+   *
+   * This is used by Scans.
+   */
+  public synchronized Result[] nextRows(int nbRows, String metric)
+  throws IOException {
+    preCondition();
+    boolean prefetchingEnabled = getOriginalScan().getServerPrefetching();
+    int limit = this.getOriginalScan().getBatch();
+    ScanResult scanResult;
+    // if we have a prefetched result, then use it
+    if (prefetchingEnabled && prefetchScanFuture != null) {
+      try {
+        scanResult = prefetchScanFuture.get();
+        prefetchScanFuture = null;
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+      if (scanResult.isException) {
+        throw scanResult.ioException;
+      }
+    }
+    // if there are no prefetched results, then preform the scan inline
+    else {
+      ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric);
+      scanResult = scanFetch.call();
+    }
+
+    // schedule a background prefetch for the next result if prefetch is
+    // enabled on scans
+    boolean scanDone =
+      (scanResult.outResults == null || scanResult.outResults.length == 0);
+    if (prefetchingEnabled && !scanDone) {
+      ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric);
+      prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable);
+    }
+    rowReadCnt.addAndGet(scanResult.outResults.length);
+    return scanResult.outResults == null ||
+        (isFilterDone() && scanResult.outResults.length == 0) ?
+        null : scanResult.outResults;
+  }
+
+  /**
+   * This is used by Gets & unit tests, whereas nextRows() is
+   * used by Scans
+   */
+  @Override
+  public synchronized boolean next(List<KeyValue> outResults, int limit,
+      String metric) throws IOException {
+    preCondition();
+    boolean returnResult;
+    if (outResults.isEmpty()) {
+       // Usually outResults is empty. This is true when next is called
+       // to handle scan or get operation.
+      returnResult = nextInternal(outResults, limit, metric);
+    } else {
+      List<KeyValue> tmpList = new ArrayList<KeyValue>();
+      returnResult = nextInternal(tmpList, limit, metric);
+      outResults.addAll(tmpList);
+    }
+    rowReadCnt.incrementAndGet();
+    resetFilters();
+    if (isFilterDone()) {
+      return false;
+    }
+    return returnResult;
+  }
+
+  @Override
+  public boolean next(List<KeyValue> outResults)
+      throws IOException {
+    // apply the batching limit by default
+    return next(outResults, batch, null);
+  }
+
+  @Override
+  public boolean next(List<KeyValue> outResults, String metric)
+      throws IOException {
+    // apply the batching limit by default
+    return next(outResults, batch, metric);
+  }
+
+  /*
+   * @return True if a filter rules the scanner is over, done.
+   */
+  private boolean isFilterDone() {
+    return this.filter != null && this.filter.filterAllRemaining();
+  }
+
+  /**
+   * @param results empty list in which results will be stored
+   */
+  private boolean nextInternal(List<KeyValue> results, int limit, String metric)
+      throws IOException {
+
+    if (!results.isEmpty()) {
+      throw new IllegalArgumentException("First parameter should be an empty list");
+    }
+
+    boolean partialRow = getOriginalScan().isPartialRow();
+    long maxResponseSize = getOriginalScan().getMaxResponseSize();
+    while (true) {
+      byte [] currentRow = peekRow();
+      if (isStopRow(currentRow)) {
+        if (filter != null && filter.hasFilterRow()) {
+          filter.filterRow(results);
+        }
+        if (filter != null && filter.filterRow()) {
+          results.clear();
+        }
+
+        return false;
+      } else if (filterRowKey(currentRow)) {
+        nextRow(currentRow);
+        results.clear();
+      } else {
+        byte [] nextRow;
+        do {
+          this.storeHeap.next(results, limit - results.size(), metric);
+          if (limit > 0 && results.size() == limit) {
+            if (this.filter != null && filter.hasFilterRow())
+              throw new IncompatibleFilterException(
+                "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
+            return true; // we are expecting more yes, but also limited to how many we can return.
+          }
+          // this gaurantees that we still complete the entire row if
+          // currentPartialResponseSize exceeds the maxResponseSize.
+          if (partialRow && getOriginalScan().getCurrentPartialResponseSize()
+               >= maxResponseSize) {
+            return true;
+          }
+        } while (Bytes.equals(currentRow, nextRow = peekRow()));
+
+        final boolean stopRow = isStopRow(nextRow);
+
+        // now that we have an entire row, lets process with a filters:
+
+        // first filter with the filterRow(List)
+        if (filter != null && filter.hasFilterRow()) {
+          filter.filterRow(results);
+        }
+
+        if (results.isEmpty() || filterRow()) {
+          nextRow(currentRow);
+          results.clear();
+
+          // This row was totally filtered out, if this is NOT the last row,
+          // we should continue on.
+
+          if (!stopRow) continue;
+        }
+        return !stopRow;
+      }
+    }
+  }
+
+  private boolean filterRow() {
+    return filter != null
+        && filter.filterRow();
+  }
+  private boolean filterRowKey(byte[] row) {
+    return filter != null
+        && filter.filterRowKey(row, 0, row.length);
+  }
+
+  protected void nextRow(byte [] currentRow) throws IOException {
+    while (Bytes.equals(currentRow, peekRow())) {
+      this.storeHeap.next(MOCKED_LIST);
+    }
+    resetFilters();
+  }
+
+  private byte[] peekRow() {
+    KeyValue kv = this.storeHeap.peek();
+    return kv == null ? null : kv.getRow();
+  }
+
+  private boolean isStopRow(byte [] currentRow) {
+    return currentRow == null ||
+        (stopRow != null &&
+        comparator.compareRows(stopRow, 0, stopRow.length,
+            currentRow, 0, currentRow.length) <= isScan);
+  }
+
+  @Override
+  public synchronized void close() {
+    if (storeHeap != null) {
+      storeHeap.close();
+      storeHeap = null;
+    }
+    // no need to sychronize here.
+    scannerReadPoints.remove(this);
+    this.filterClosed = true;
+  }
+
+  KeyValueHeap getStoreHeapForTesting() {
+    return storeHeap;
+  }
+
+  /**
+   * Get the original scan object that was used to create this internal one
+   * @return original scan object... used for debug output
+   */
+  public Scan getOriginalScan() {
+    return originalScan;
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Mar 23 18:18:39 2013
@@ -392,8 +392,8 @@ public class Store extends SchemaConfigu
     }
     // initialize the thread pool for opening store files in parallel..
     ThreadPoolExecutor storeFileOpenerThreadPool =
-      this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
-          this.family.getNameAsString());
+        StoreThreadUtils.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
+          this.family.getNameAsString(), this.getHRegionInfo(), this.conf);
     CompletionService<StoreFile> completionService =
       new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
 
@@ -587,9 +587,9 @@ public class Store extends SchemaConfigu
       storefiles = ImmutableList.of();
       if (!result.isEmpty()) {
         // initialize the thread pool for closing store files in parallel.
-        ThreadPoolExecutor storeFileCloserThreadPool = this.region
-            .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
-                + this.family.getNameAsString());
+        ThreadPoolExecutor storeFileCloserThreadPool =
+            StoreThreadUtils.getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+                + this.family.getNameAsString(), this.getHRegionInfo(), this.conf);
 
         // close each store file in parallel
         CompletionService<Void> completionService =

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java?rev=1460199&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java Sat Mar 23 18:18:39 2013
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Thread utilities which are used while creating a thread pool to
+ * load store files
+ */
+public class StoreThreadUtils {
+  public static ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
+      final String threadNamePrefix, HRegionInfo regionInfo, Configuration conf) {
+    int numStores = Math.max(1, regionInfo.getTableDesc().families.size());
+    int maxThreads = Math.min(numStores,
+        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
+    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+  }
+
+  public static ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
+      final String threadNamePrefix, HRegionInfo regionInfo, Configuration conf) {
+    int numStores = Math.max(1, regionInfo.getTableDesc().families.size());
+    int maxThreads = Math.max(1,
+        conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+            HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
+            / numStores);
+    return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+  }
+
+  private static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+      final String threadNamePrefix) {
+    ThreadPoolExecutor openAndCloseThreadPool = Threads
+        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+            new ThreadFactory() {
+              private int count = 1;
+
+              public Thread newThread(Runnable r) {
+                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+                t.setDaemon(true);
+                return t;
+              }
+            });
+    return openAndCloseThreadPool;
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat Mar 23 18:18:39 2013
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.filter.Fi
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -1417,7 +1416,7 @@ public class TestHRegion extends HBaseTe
     region.put(put);
 
     Scan scan = null;
-    HRegion.RegionScanner is = null;
+    RegionScanner is = null;
 
     //Testing to see how many scanners that is produced by getScanner, starting
     //with known number, 2 - current = 1

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java Sat Mar 23 18:18:39 2013
@@ -97,7 +97,7 @@ public class TestScanWithBloomError {
     LOG.info("Scanning column set: " + Arrays.toString(colSet));
     Scan scan = new Scan(ROW_BYTES, ROW_BYTES);
     addColumnSetToScan(scan, colSet);
-    HRegion.RegionScanner scanner = (HRegion.RegionScanner)
+    RegionScanner scanner = (RegionScanner)
         region.getScanner(scan);
     KeyValueHeap storeHeap = scanner.getStoreHeapForTesting();
     assertEquals(0, storeHeap.getHeap().size());

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java Sat Mar 23 18:18:39 2013
@@ -134,7 +134,7 @@ public class TestWideScanner extends HBa
 
         // trigger ChangedReadersObservers
         Iterator<KeyValueScanner> scanners =
-            ((HRegion.RegionScanner)s).storeHeap.getHeap().iterator();
+            ((RegionScanner)s).storeHeap.getHeap().iterator();
         while (scanners.hasNext()) {
             StoreScanner ss = (StoreScanner)scanners.next();
             ss.updateReaders();