You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/03/23 19:18:39 UTC
svn commit: r1460199 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: liyin
Date: Sat Mar 23 18:18:39 2013
New Revision: 1460199
URL: http://svn.apache.org/r1460199
Log:
[HBASE-8185] Pulling HRegionScanner code outside the HRegion class.
Author: manukranthk
Summary: This diff is to decompose the diff D708638 into few smaller diffs for a quicker and easier review process.
Test Plan: Since this is a refactoring change, testing the code change by running MR Unit tests.
Reviewers: liyintang, rshroff, aaiyer
Reviewed By: liyintang
CC: hbase-eng@, erling, arice
Differential Revision: https://phabricator.fb.com/D743249
Task ID: 2103689
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Mar 23 18:18:39 2013
@@ -91,8 +91,6 @@ import org.apache.hadoop.hbase.ipc.HRegi
import org.apache.hadoop.hbase.metrics.RequestMetrics;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanPrefetcher;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanResult;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -587,8 +585,9 @@ public class HRegion implements HeapSize
if (!families.isEmpty()) {
// initialize the thread pool for opening stores in parallel.
ThreadPoolExecutor storeOpenerThreadPool =
- getStoreOpenAndCloseThreadPool(
- "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
+ StoreThreadUtils.getStoreOpenAndCloseThreadPool(
+ "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString(),
+ this.getRegionInfo(), this.conf);
CompletionService<Store> completionService =
new ExecutorCompletionService<Store>(storeOpenerThreadPool);
@@ -857,8 +856,8 @@ public class HRegion implements HeapSize
if (!stores.isEmpty()) {
// initialize the thread pool for closing stores in parallel.
ThreadPoolExecutor storeCloserThreadPool =
- getStoreOpenAndCloseThreadPool("StoreCloserThread-"
- + this.regionInfo.getRegionNameAsString());
+ StoreThreadUtils.getStoreOpenAndCloseThreadPool("StoreCloserThread-"
+ + this.regionInfo.getRegionNameAsString(), this.getRegionInfo(), this.conf);
CompletionService<ImmutableList<StoreFile>> completionService =
new ExecutorCompletionService<ImmutableList<StoreFile>>(
storeCloserThreadPool);
@@ -902,41 +901,6 @@ public class HRegion implements HeapSize
}
}
- protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
- final String threadNamePrefix) {
- int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size());
- int maxThreads = Math.min(numStores,
- conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
- HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
- return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
- }
-
- protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
- final String threadNamePrefix) {
- int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size());
- int maxThreads = Math.max(1,
- conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
- HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
- / numStores);
- return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
- }
-
- private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
- final String threadNamePrefix) {
- ThreadPoolExecutor openAndCloseThreadPool = Threads
- .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadNamePrefix + "-" + count++);
- t.setDaemon(true);
- return t;
- }
- });
- return openAndCloseThreadPool;
- }
-
/**
* @return True if its worth doing a flush before we put up the close flag.
*/
@@ -1686,7 +1650,10 @@ public class HRegion implements HeapSize
}
protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
- return new RegionScanner(scan, additionalScanners);
+ // return new RegionScanner(scan, additionalScanners);
+ RegionContext regionContext = new RegionContext(stores, scannerReadPoints,
+ comparator, mvcc, closing, closed, regionInfo, rowReadCnt);
+ return new RegionScanner(scan, additionalScanners, regionContext);
}
/*
@@ -2997,408 +2964,6 @@ public class HRegion implements HeapSize
return this.tableDir;
}
- /**
- * RegionScanner is an iterator through a bunch of rows in an HRegion.
- * <p>
- * It is used to combine scanners from multiple Stores (aka column families).
- */
- class RegionScanner implements InternalScanner {
- // Package local for testability
- KeyValueHeap storeHeap = null;
- private final byte [] stopRow;
- private Filter filter;
- private final int batch;
- private int isScan;
- private boolean filterClosed = false;
- private long readPt;
- private Scan originalScan;
- private Future<ScanResult> prefetchScanFuture = null;
-
- RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
- //DebugPrint.println("HRegionScanner.<init>");
-
- this.originalScan = scan;
-
- this.filter = scan.getFilter();
- this.batch = scan.getBatch();
- if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
- this.stopRow = null;
- } else {
- this.stopRow = scan.getStopRow();
- }
- // If we are doing a get, we want to be [startRow,endRow] normally
- // it is [startRow,endRow) and if startRow=endRow we get nothing.
- this.isScan = scan.isGetScan() ? -1 : 0;
-
- // synchronize on scannerReadPoints so that nobody calculates
- // getSmallestReadPoint, before scannerReadPoints is updated.
- synchronized(scannerReadPoints) {
- this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
- scannerReadPoints.put(this, this.readPt);
- }
-
- List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
- if (additionalScanners != null) {
- scanners.addAll(additionalScanners);
- }
-
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
- scan.getFamilyMap().entrySet()) {
- Store store = stores.get(entry.getKey());
- StoreScanner scanner = store.getScanner(scan, entry.getValue());
- scanners.add(scanner);
- }
- this.storeHeap = new KeyValueHeap(scanners, comparator);
- }
-
- RegionScanner(Scan scan) throws IOException {
- this(scan, null);
- }
-
- /**
- * Reset both the filter and the old filter.
- */
- protected void resetFilters() {
- if (filter != null) {
- filter.reset();
- }
- }
-
- @Override
- public boolean next(List<KeyValue> outResults, int limit)
- throws IOException {
- return next(outResults, limit, null);
- }
-
- private void preCondition() throws IOException{
- if (this.filterClosed) {
- throw new UnknownScannerException("Scanner was closed (timed out?) " +
- "after we renewed it. Could be caused by a very slow scanner " +
- "or a lengthy garbage collection");
- }
- if (closing.get() || closed.get()) {
- close();
- throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
- " is closing=" + closing.get() + " or closed=" + closed.get());
- }
-
- // This could be a new thread from the last time we called next().
- MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
- }
-
- /**
- * This class abstracts the results of a single scanner's result. It tracks
- * the list of Result objects if the pre-fetch next was successful, and
- * tracks the exception if the next failed.
- */
- class ScanResult {
- final boolean isException;
- IOException ioException = null;
- Result[] outResults;
- boolean moreRows;
-
- public ScanResult(IOException ioException) {
- isException = true;
- this.ioException = ioException;
- }
-
- public ScanResult(boolean moreRows, Result[] outResults) {
- isException = false;
- this.moreRows = moreRows;
- this.outResults = outResults;
- }
- }
-
- /**
- * This Callable abstracts calling a pre-fetch next. This is called on a
- * threadpool. It makes a pre-fetch next call with the same parameters as
- * the incoming next call. Note that the number of rows to return (nbRows)
- * and/or the memory size for the result is the same as the previous call if
- * pre-fetching is enabled. If these params change dynamically, they will
- * take effect in the subsequent iteration.
- */
- class ScanPrefetcher implements Callable<ScanResult> {
- int nbRows;
- int limit;
- String metric;
-
- ScanPrefetcher(int nbRows, int limit, String metric) {
- this.nbRows = nbRows;
- this.limit = limit;
- this.metric = metric;
- }
-
- public ScanResult call() {
- ScanResult scanResult = null;
- List<Result> outResults = new ArrayList<Result>();
- List<KeyValue> tmpList = new ArrayList<KeyValue>();
- int currentNbRows = 0;
- boolean moreRows = true;
- try {
- // This is necessary b/c partialResponseSize is not serialized through
- // RPC
- getOriginalScan().setCurrentPartialResponseSize(0);
- int maxResponseSize = getOriginalScan().getMaxResponseSize();
- do {
- moreRows = nextInternal(tmpList, limit, metric);
- if (!tmpList.isEmpty()) {
- currentNbRows++;
- if (outResults != null) {
- outResults.add(new Result(tmpList));
- tmpList.clear();
- }
- }
- resetFilters();
- if (isFilterDone()) {
- break;
- }
-
- // While Condition
- // 1. respect maxResponseSize and nbRows whichever comes first,
- // 2. recheck the currentPartialResponseSize is to catch the case
- // where maxResponseSize is saturated and partialRow == false
- // since we allow this case valid in the nextInternal() layer
- } while (moreRows
- && (getOriginalScan().getCurrentPartialResponseSize() <
- maxResponseSize && currentNbRows < nbRows));
- scanResult = new ScanResult(moreRows,
- outResults.toArray(new Result[0]));
- } catch (IOException e) {
- // we should queue the exception as the result so that we can return
- // this when the result is asked for
- scanResult = new ScanResult(e);
- }
- return scanResult;
- }
- }
-
- /**
- * A method to return all the rows that can fit in the response size.
- * it respects the two stop conditions:
- * 1) scan.getMaxResponseSize
- * 2) scan.getCaching() (which is nbRows)
- * the loop breaks whoever comes first.
- * This is only used by scan(), not get()
- * @param outResults a list of rows to return
- * @param nbRows the number of rows that can be returned at most
- * @param metric the metric name
- * @return true if there are more rows to fetch.
- *
- * This is used by Scans.
- */
- public synchronized Result[] nextRows(int nbRows, String metric)
- throws IOException {
- preCondition();
- boolean prefetchingEnabled = getOriginalScan().getServerPrefetching();
- int limit = this.getOriginalScan().getBatch();
- ScanResult scanResult;
- // if we have a prefetched result, then use it
- if (prefetchingEnabled && prefetchScanFuture != null) {
- try {
- scanResult = prefetchScanFuture.get();
- prefetchScanFuture = null;
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (ExecutionException e) {
- throw new IOException(e);
- }
- if (scanResult.isException) {
- throw scanResult.ioException;
- }
- }
- // if there are no prefetched results, then preform the scan inline
- else {
- ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric);
- scanResult = scanFetch.call();
- }
-
- if (scanResult.isException) {
- throw scanResult.ioException;
- }
-
- // schedule a background prefetch for the next result if prefetch is
- // enabled on scans
- boolean scanDone =
- (scanResult.outResults == null || scanResult.outResults.length == 0);
- if (prefetchingEnabled && !scanDone) {
- ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric);
- prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable);
- }
- if (!scanDone) {
- rowReadCnt.addAndGet(scanResult.outResults.length);
- }
- return scanResult.outResults == null ||
- (isFilterDone() && scanResult.outResults.length == 0) ?
- null : scanResult.outResults;
- }
-
- /**
- * This is used by Gets & unit tests, whereas nextRows() is
- * used by Scans
- */
- @Override
- public synchronized boolean next(List<KeyValue> outResults, int limit,
- String metric) throws IOException {
- preCondition();
- boolean returnResult;
- if (outResults.isEmpty()) {
- // Usually outResults is empty. This is true when next is called
- // to handle scan or get operation.
- returnResult = nextInternal(outResults, limit, metric);
- } else {
- List<KeyValue> tmpList = new ArrayList<KeyValue>();
- returnResult = nextInternal(tmpList, limit, metric);
- outResults.addAll(tmpList);
- }
- rowReadCnt.incrementAndGet();
- resetFilters();
- if (isFilterDone()) {
- return false;
- }
- return returnResult;
- }
-
- @Override
- public boolean next(List<KeyValue> outResults)
- throws IOException {
- // apply the batching limit by default
- return next(outResults, batch, null);
- }
-
- @Override
- public boolean next(List<KeyValue> outResults, String metric)
- throws IOException {
- // apply the batching limit by default
- return next(outResults, batch, metric);
- }
-
- /*
- * @return True if a filter rules the scanner is over, done.
- */
- private boolean isFilterDone() {
- return this.filter != null && this.filter.filterAllRemaining();
- }
-
- /**
- * @param results empty list in which results will be stored
- */
- private boolean nextInternal(List<KeyValue> results, int limit, String metric)
- throws IOException {
-
- if (!results.isEmpty()) {
- throw new IllegalArgumentException("First parameter should be an empty list");
- }
-
- boolean partialRow = getOriginalScan().isPartialRow();
- long maxResponseSize = getOriginalScan().getMaxResponseSize();
-
- while (true) {
- byte [] currentRow = peekRow();
- if (isStopRow(currentRow)) {
- if (filter != null && filter.hasFilterRow()) {
- filter.filterRow(results);
- }
- if (filter != null && filter.filterRow()) {
- results.clear();
- }
-
- return false;
- } else if (filterRowKey(currentRow)) {
- nextRow(currentRow);
- results.clear();
- } else {
- byte [] nextRow;
- do {
- this.storeHeap.next(results, limit - results.size(), metric);
- if (limit > 0 && results.size() == limit) {
- if (this.filter != null && filter.hasFilterRow())
- throw new IncompatibleFilterException(
- "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
- return true; // we are expecting more yes, but also limited to how many we can return.
- }
- // this gaurantees that we still complete the entire row if
- // currentPartialResponseSize exceeds the maxResponseSize.
- if (partialRow && getOriginalScan().getCurrentPartialResponseSize()
- >= maxResponseSize) {
- return true;
- }
- } while (Bytes.equals(currentRow, nextRow = peekRow()));
-
- final boolean stopRow = isStopRow(nextRow);
-
- // now that we have an entire row, lets process with a filters:
-
- // first filter with the filterRow(List)
- if (filter != null && filter.hasFilterRow()) {
- filter.filterRow(results);
- }
-
- if (results.isEmpty() || filterRow()) {
- nextRow(currentRow);
- results.clear();
-
- // This row was totally filtered out, if this is NOT the last row,
- // we should continue on.
-
- if (!stopRow) continue;
- }
- return !stopRow;
- }
- }
- }
-
- private boolean filterRow() {
- return filter != null
- && filter.filterRow();
- }
- private boolean filterRowKey(byte[] row) {
- return filter != null
- && filter.filterRowKey(row, 0, row.length);
- }
-
- protected void nextRow(byte [] currentRow) throws IOException {
- while (Bytes.equals(currentRow, peekRow())) {
- this.storeHeap.next(MOCKED_LIST);
- }
- resetFilters();
- }
-
- private byte[] peekRow() {
- KeyValue kv = this.storeHeap.peek();
- return kv == null ? null : kv.getRow();
- }
-
- private boolean isStopRow(byte [] currentRow) {
- return currentRow == null ||
- (stopRow != null &&
- comparator.compareRows(stopRow, 0, stopRow.length,
- currentRow, 0, currentRow.length) <= isScan);
- }
-
- @Override
- public synchronized void close() {
- if (storeHeap != null) {
- storeHeap.close();
- storeHeap = null;
- }
- // no need to sychronize here.
- scannerReadPoints.remove(this);
- this.filterClosed = true;
- }
-
- KeyValueHeap getStoreHeapForTesting() {
- return storeHeap;
- }
-
- /**
- * Get the original scan object that was used to create this internal one
- * @return original scan object... used for debug output
- */
- public Scan getOriginalScan() {
- return originalScan;
- }
- }
-
// Utility methods
/**
* A utility method to create new instances of HRegion based on the
@@ -4283,7 +3848,7 @@ public class HRegion implements HeapSize
/**
* A mocked list implementaion - discards all updates.
*/
- private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
+ public static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
@Override
public void add(int index, KeyValue element) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Mar 23 18:18:39 2013
@@ -2635,8 +2635,8 @@ public class HRegionServer implements HR
long scannerId = (Long) params[0];
String scannerName = String.valueOf(scannerId);
InternalScanner s = regionServer.scanners.get(scannerName);
- if (s != null && s instanceof HRegion.RegionScanner) {
- res.put("scan", ((HRegion.RegionScanner)s).getOriginalScan().toMap(DEFAULT_MAX_COLS));
+ if (s != null && s instanceof RegionScanner) {
+ res.put("scan", ((RegionScanner)s).getOriginalScan().toMap(DEFAULT_MAX_COLS));
}
if (params.length > 1) {
@@ -2653,7 +2653,7 @@ public class HRegionServer implements HR
String scannerName = String.valueOf(scannerId);
// HRegionServer only deals with Region Scanner,
// thus, we just typecast directly
- HRegion.RegionScanner s = (HRegion.RegionScanner)this.scanners.get(scannerName);
+ RegionScanner s = (RegionScanner)this.scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java?rev=1460199&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java Sat Mar 23 18:18:39 2013
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+/**
+ * RegionContext gives a container for all the member variables in the
+ * HRegion class which the RegionScanner needs in its constructor. Earlier,
+ * since the RegionScanner was an inner class to HRegion, these members were
+ * accessible. Now that the RegionScanner is an external class, these variables
+ * are packaged into RegionContext.
+ *
+ * @author manukranthk
+ *
+ */
+public class RegionContext {
+ final private Map<byte[], Store> stores;
+ final private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+ final private KeyValue.KVComparator comparator;
+ final private MultiVersionConsistencyControl mvcc;
+ final private AtomicBoolean closing;
+ final private AtomicBoolean closed;
+ final private HRegionInfo regionInfo;
+ final private AtomicInteger rowReadCnt;
+
+ public RegionContext(Map<byte[], Store> stores,
+ ConcurrentHashMap<RegionScanner, Long> scannerReadPoints,
+ KeyValue.KVComparator comparator,
+ MultiVersionConsistencyControl mvcc,
+ AtomicBoolean closing,
+ AtomicBoolean closed,
+ HRegionInfo regionInfo,
+ AtomicInteger rowReadCnt) {
+ this.stores = stores;
+ this.scannerReadPoints = scannerReadPoints;
+ this.comparator = comparator;
+ this.mvcc = mvcc;
+ this.closing = closing;
+ this.closed = closed;
+ this.regionInfo = regionInfo;
+ this.rowReadCnt = rowReadCnt;
+ }
+
+ /*
+ * Constructor to create a region context for
+ * opening a read only RegionScanner
+ */
+ public RegionContext(Map<byte[], Store> stores, HRegionInfo regionInfo) {
+ this.stores = stores;
+ this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
+ this.comparator = regionInfo.getComparator();
+ this.mvcc = new MultiVersionConsistencyControl();
+ this.mvcc.setThreadReadPoint(Long.MAX_VALUE);
+ this.closing = new AtomicBoolean(false);
+ this.closed = new AtomicBoolean(false);
+ this.regionInfo = regionInfo;
+ this.rowReadCnt = new AtomicInteger(0);
+ }
+
+ public Map<byte[], Store> getStores() {
+ return this.stores;
+ }
+
+ public ConcurrentHashMap<RegionScanner, Long> getScannerReadPoints() {
+ return this.scannerReadPoints;
+ }
+
+ public KeyValue.KVComparator getComparator() {
+ return this.comparator;
+ }
+
+ public MultiVersionConsistencyControl getmvcc() {
+ return this.mvcc;
+ }
+
+ public AtomicBoolean getClosing() {
+ return this.closing;
+ }
+
+ public AtomicBoolean getClosed() {
+ return this.closed;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return this.regionInfo;
+ }
+
+ public AtomicInteger getRowReadCnt() {
+ return this.rowReadCnt;
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1460199&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Sat Mar 23 18:18:39 2013
@@ -0,0 +1,449 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * RegionScanner is an iterator through a bunch of rows in an HRegion.
+ * <p>
+ * It is used to combine scanners from multiple Stores (aka column families).
+ */
+public class RegionScanner implements InternalScanner {
+//Package local for testability
+ KeyValueHeap storeHeap = null;
+ private final byte [] stopRow;
+ private Filter filter;
+ private final int batch;
+ private int isScan;
+ private boolean filterClosed = false;
+ private long readPt;
+ private Scan originalScan;
+ private Future<ScanResult> prefetchScanFuture = null;
+ private Map<byte[], Store> stores;
+ private KeyValue.KVComparator comparator;
+ private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+ private MultiVersionConsistencyControl mvcc;
+ private AtomicBoolean closing;
+ private AtomicBoolean closed;
+ private HRegionInfo regionInfo;
+ private AtomicInteger rowReadCnt;
+ private final List<KeyValue> MOCKED_LIST = HRegion.MOCKED_LIST;
+
+ public RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, RegionContext regionContext)
+ throws IOException {
+ this.stores = regionContext.getStores();
+ this.scannerReadPoints = regionContext.getScannerReadPoints();
+ this.comparator = regionContext.getComparator();
+ this.mvcc = regionContext.getmvcc();
+ this.closing = regionContext.getClosing();
+ this.closed = regionContext.getClosed();
+ this.regionInfo = regionContext.getRegionInfo();
+ this.rowReadCnt = regionContext.getRowReadCnt();
+ this.originalScan = scan;
+
+ this.filter = scan.getFilter();
+ this.batch = scan.getBatch();
+ if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
+ this.stopRow = null;
+ } else {
+ this.stopRow = scan.getStopRow();
+ }
+ // If we are doing a get, we want to be [startRow,endRow] normally
+ // it is [startRow,endRow) and if startRow=endRow we get nothing.
+ this.isScan = scan.isGetScan() ? -1 : 0;
+
+ // synchronize on scannerReadPoints so that nobody calculates
+ // getSmallestReadPoint, before scannerReadPoints is updated.
+ synchronized(scannerReadPoints) {
+ this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
+ scannerReadPoints.put(this, this.readPt);
+ }
+
+ List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+ if (additionalScanners != null) {
+ scanners.addAll(additionalScanners);
+ }
+
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
+ scan.getFamilyMap().entrySet()) {
+ Store store = stores.get(entry.getKey());
+ StoreScanner scanner = store.getScanner(scan, entry.getValue());
+ scanners.add(scanner);
+ }
+ this.storeHeap = new KeyValueHeap(scanners, comparator);
+ }
+
+ /**
+ * Reset both the filter and the old filter.
+ */
+ protected void resetFilters() {
+ if (filter != null) {
+ filter.reset();
+ }
+ }
+
+ @Override
+ public boolean next(List<KeyValue> outResults, int limit)
+ throws IOException {
+ return next(outResults, limit, null);
+ }
+
+ private void preCondition() throws IOException{
+ if (this.filterClosed) {
+ throw new UnknownScannerException("Scanner was closed (timed out?) " +
+ "after we renewed it. Could be caused by a very slow scanner " +
+ "or a lengthy garbage collection");
+ }
+ if (closing.get() || closed.get()) {
+ close();
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+ " is closing=" + closing.get() + " or closed=" + closed.get());
+ }
+
+ // This could be a new thread from the last time we called next().
+ MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
+ }
+
+ /**
+ * This class abstracts the results of a single scanner's result. It tracks
+ * the list of Result objects if the pre-fetch next was successful, and
+ * tracks the exception if the next failed.
+ */
+ class ScanResult {
+ final boolean isException;
+ IOException ioException = null;
+ Result[] outResults;
+ boolean moreRows;
+
+ public ScanResult(IOException ioException) {
+ isException = true;
+ this.ioException = ioException;
+ }
+
+ public ScanResult(boolean moreRows, Result[] outResults) {
+ isException = false;
+ this.moreRows = moreRows;
+ this.outResults = outResults;
+ }
+ }
+
+ /**
+ * This Callable abstracts calling a pre-fetch next. This is called on a
+ * threadpool. It makes a pre-fetch next call with the same parameters as
+ * the incoming next call. Note that the number of rows to return (nbRows)
+ * and/or the memory size for the result is the same as the previous call if
+ * pre-fetching is enabled. If these params change dynamically, they will
+ * take effect in the subsequent iteration.
+ */
+ class ScanPrefetcher implements Callable<ScanResult> {
+ int nbRows;
+ int limit;
+ String metric;
+
+ ScanPrefetcher(int nbRows, int limit, String metric) {
+ this.nbRows = nbRows;
+ this.limit = limit;
+ this.metric = metric;
+ }
+
+ public ScanResult call() {
+ ScanResult scanResult = null;
+ List<Result> outResults = new ArrayList<Result>();
+ List<KeyValue> tmpList = new ArrayList<KeyValue>();
+ int currentNbRows = 0;
+ boolean moreRows = true;
+ try {
+ // This is necessary b/c partialResponseSize is not serialized through
+ // RPC
+ getOriginalScan().setCurrentPartialResponseSize(0);
+ int maxResponseSize = getOriginalScan().getMaxResponseSize();
+ do {
+ moreRows = nextInternal(tmpList, limit, metric);
+ if (!tmpList.isEmpty()) {
+ currentNbRows++;
+ if (outResults != null) {
+ outResults.add(new Result(tmpList));
+ tmpList.clear();
+ }
+ }
+ resetFilters();
+ if (isFilterDone()) {
+ break;
+ }
+
+ // While Condition
+ // 1. respect maxResponseSize and nbRows whichever comes first,
+ // 2. recheck the currentPartialResponseSize is to catch the case
+ // where maxResponseSize is saturated and partialRow == false
+ // since we allow this case valid in the nextInternal() layer
+ } while (moreRows
+ && (getOriginalScan().getCurrentPartialResponseSize() <
+ maxResponseSize && currentNbRows < nbRows));
+ scanResult = new ScanResult(moreRows,
+ outResults.toArray(new Result[0]));
+ } catch (IOException e) {
+ // we should queue the exception as the result so that we can return
+ // this when the result is asked for
+ scanResult = new ScanResult(e);
+ }
+ return scanResult;
+ }
+ }
+
+ /**
+ * A method to return all the rows that can fit in the response size.
+ * it respects the two stop conditions:
+ * 1) scan.getMaxResponseSize
+ * 2) scan.getCaching() (which is nbRows)
+ * the loop breaks whoever comes first.
+ * This is only used by scan(), not get()
+ * @param outResults a list of rows to return
+ * @param nbRows the number of rows that can be returned at most
+ * @param metric the metric name
+ * @return true if there are more rows to fetch.
+ *
+ * This is used by Scans.
+ */
+ public synchronized Result[] nextRows(int nbRows, String metric)
+ throws IOException {
+ preCondition();
+ boolean prefetchingEnabled = getOriginalScan().getServerPrefetching();
+ int limit = this.getOriginalScan().getBatch();
+ ScanResult scanResult;
+ // if we have a prefetched result, then use it
+ if (prefetchingEnabled && prefetchScanFuture != null) {
+ try {
+ scanResult = prefetchScanFuture.get();
+ prefetchScanFuture = null;
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ if (scanResult.isException) {
+ throw scanResult.ioException;
+ }
+ }
+ // if there are no prefetched results, then preform the scan inline
+ else {
+ ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric);
+ scanResult = scanFetch.call();
+ }
+
+ // schedule a background prefetch for the next result if prefetch is
+ // enabled on scans
+ boolean scanDone =
+ (scanResult.outResults == null || scanResult.outResults.length == 0);
+ if (prefetchingEnabled && !scanDone) {
+ ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric);
+ prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable);
+ }
+ rowReadCnt.addAndGet(scanResult.outResults.length);
+ return scanResult.outResults == null ||
+ (isFilterDone() && scanResult.outResults.length == 0) ?
+ null : scanResult.outResults;
+ }
+
+ /**
+ * This is used by Gets & unit tests, whereas nextRows() is
+ * used by Scans
+ */
+ @Override
+ public synchronized boolean next(List<KeyValue> outResults, int limit,
+ String metric) throws IOException {
+ preCondition();
+ boolean returnResult;
+ if (outResults.isEmpty()) {
+ // Usually outResults is empty. This is true when next is called
+ // to handle scan or get operation.
+ returnResult = nextInternal(outResults, limit, metric);
+ } else {
+ List<KeyValue> tmpList = new ArrayList<KeyValue>();
+ returnResult = nextInternal(tmpList, limit, metric);
+ outResults.addAll(tmpList);
+ }
+ rowReadCnt.incrementAndGet();
+ resetFilters();
+ if (isFilterDone()) {
+ return false;
+ }
+ return returnResult;
+ }
+
+ @Override
+ public boolean next(List<KeyValue> outResults)
+ throws IOException {
+ // apply the batching limit by default
+ return next(outResults, batch, null);
+ }
+
+ @Override
+ public boolean next(List<KeyValue> outResults, String metric)
+ throws IOException {
+ // apply the batching limit by default
+ return next(outResults, batch, metric);
+ }
+
+ /*
+ * @return True if a filter rules the scanner is over, done.
+ */
+ private boolean isFilterDone() {
+ return this.filter != null && this.filter.filterAllRemaining();
+ }
+
+ /**
+ * @param results empty list in which results will be stored
+ */
+ private boolean nextInternal(List<KeyValue> results, int limit, String metric)
+ throws IOException {
+
+ if (!results.isEmpty()) {
+ throw new IllegalArgumentException("First parameter should be an empty list");
+ }
+
+ boolean partialRow = getOriginalScan().isPartialRow();
+ long maxResponseSize = getOriginalScan().getMaxResponseSize();
+ while (true) {
+ byte [] currentRow = peekRow();
+ if (isStopRow(currentRow)) {
+ if (filter != null && filter.hasFilterRow()) {
+ filter.filterRow(results);
+ }
+ if (filter != null && filter.filterRow()) {
+ results.clear();
+ }
+
+ return false;
+ } else if (filterRowKey(currentRow)) {
+ nextRow(currentRow);
+ results.clear();
+ } else {
+ byte [] nextRow;
+ do {
+ this.storeHeap.next(results, limit - results.size(), metric);
+ if (limit > 0 && results.size() == limit) {
+ if (this.filter != null && filter.hasFilterRow())
+ throw new IncompatibleFilterException(
+ "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
+ return true; // we are expecting more yes, but also limited to how many we can return.
+ }
+ // this gaurantees that we still complete the entire row if
+ // currentPartialResponseSize exceeds the maxResponseSize.
+ if (partialRow && getOriginalScan().getCurrentPartialResponseSize()
+ >= maxResponseSize) {
+ return true;
+ }
+ } while (Bytes.equals(currentRow, nextRow = peekRow()));
+
+ final boolean stopRow = isStopRow(nextRow);
+
+ // now that we have an entire row, lets process with a filters:
+
+ // first filter with the filterRow(List)
+ if (filter != null && filter.hasFilterRow()) {
+ filter.filterRow(results);
+ }
+
+ if (results.isEmpty() || filterRow()) {
+ nextRow(currentRow);
+ results.clear();
+
+ // This row was totally filtered out, if this is NOT the last row,
+ // we should continue on.
+
+ if (!stopRow) continue;
+ }
+ return !stopRow;
+ }
+ }
+ }
+
+ private boolean filterRow() {
+ return filter != null
+ && filter.filterRow();
+ }
+ private boolean filterRowKey(byte[] row) {
+ return filter != null
+ && filter.filterRowKey(row, 0, row.length);
+ }
+
+ protected void nextRow(byte [] currentRow) throws IOException {
+ while (Bytes.equals(currentRow, peekRow())) {
+ this.storeHeap.next(MOCKED_LIST);
+ }
+ resetFilters();
+ }
+
+ private byte[] peekRow() {
+ KeyValue kv = this.storeHeap.peek();
+ return kv == null ? null : kv.getRow();
+ }
+
+ private boolean isStopRow(byte [] currentRow) {
+ return currentRow == null ||
+ (stopRow != null &&
+ comparator.compareRows(stopRow, 0, stopRow.length,
+ currentRow, 0, currentRow.length) <= isScan);
+ }
+
+ @Override
+ public synchronized void close() {
+ if (storeHeap != null) {
+ storeHeap.close();
+ storeHeap = null;
+ }
+ // no need to sychronize here.
+ scannerReadPoints.remove(this);
+ this.filterClosed = true;
+ }
+
+ KeyValueHeap getStoreHeapForTesting() {
+ return storeHeap;
+ }
+
+ /**
+ * Get the original scan object that was used to create this internal one
+ * @return original scan object... used for debug output
+ */
+ public Scan getOriginalScan() {
+ return originalScan;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Mar 23 18:18:39 2013
@@ -392,8 +392,8 @@ public class Store extends SchemaConfigu
}
// initialize the thread pool for opening store files in parallel..
ThreadPoolExecutor storeFileOpenerThreadPool =
- this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
- this.family.getNameAsString());
+ StoreThreadUtils.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
+ this.family.getNameAsString(), this.getHRegionInfo(), this.conf);
CompletionService<StoreFile> completionService =
new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
@@ -587,9 +587,9 @@ public class Store extends SchemaConfigu
storefiles = ImmutableList.of();
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
- ThreadPoolExecutor storeFileCloserThreadPool = this.region
- .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
- + this.family.getNameAsString());
+ ThreadPoolExecutor storeFileCloserThreadPool =
+ StoreThreadUtils.getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+ + this.family.getNameAsString(), this.getHRegionInfo(), this.conf);
// close each store file in parallel
CompletionService<Void> completionService =
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java?rev=1460199&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java Sat Mar 23 18:18:39 2013
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Thread utilities which are used while creating a thread pool to
+ * load store files
+ */
+public class StoreThreadUtils {
+ public static ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
+ final String threadNamePrefix, HRegionInfo regionInfo, Configuration conf) {
+ int numStores = Math.max(1, regionInfo.getTableDesc().families.size());
+ int maxThreads = Math.min(numStores,
+ conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+ HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
+ return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+ }
+
+ public static ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
+ final String threadNamePrefix, HRegionInfo regionInfo, Configuration conf) {
+ int numStores = Math.max(1, regionInfo.getTableDesc().families.size());
+ int maxThreads = Math.max(1,
+ conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
+ HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
+ / numStores);
+ return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
+ }
+
+ private static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+ final String threadNamePrefix) {
+ ThreadPoolExecutor openAndCloseThreadPool = Threads
+ .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ return openAndCloseThreadPool;
+ }
+}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat Mar 23 18:18:39 2013
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.filter.Fi
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -1417,7 +1416,7 @@ public class TestHRegion extends HBaseTe
region.put(put);
Scan scan = null;
- HRegion.RegionScanner is = null;
+ RegionScanner is = null;
//Testing to see how many scanners that is produced by getScanner, starting
//with known number, 2 - current = 1
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java Sat Mar 23 18:18:39 2013
@@ -97,7 +97,7 @@ public class TestScanWithBloomError {
LOG.info("Scanning column set: " + Arrays.toString(colSet));
Scan scan = new Scan(ROW_BYTES, ROW_BYTES);
addColumnSetToScan(scan, colSet);
- HRegion.RegionScanner scanner = (HRegion.RegionScanner)
+ RegionScanner scanner = (RegionScanner)
region.getScanner(scan);
KeyValueHeap storeHeap = scanner.getStoreHeapForTesting();
assertEquals(0, storeHeap.getHeap().size());
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java?rev=1460199&r1=1460198&r2=1460199&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java Sat Mar 23 18:18:39 2013
@@ -134,7 +134,7 @@ public class TestWideScanner extends HBa
// trigger ChangedReadersObservers
Iterator<KeyValueScanner> scanners =
- ((HRegion.RegionScanner)s).storeHeap.getHeap().iterator();
+ ((RegionScanner)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
ss.updateReaders();