You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/04/02 00:11:24 UTC
svn commit: r930112 - in /hadoop/hbase/branches/0.20: ./
src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/
src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop...
Author: stack
Date: Thu Apr 1 22:11:23 2010
New Revision: 930112
URL: http://svn.apache.org/viewvc?rev=930112&view=rev
Log:
HBASE-1537 Intra-row scanning
Added:
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
Modified:
hadoop/hbase/branches/0.20/CHANGES.txt
hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java
Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Thu Apr 1 22:11:23 2010
@@ -102,6 +102,7 @@ Release 0.20.4 - Unreleased
HBASE-2388 Give a very explicit message when we figure a big GC pause
HBASE-2270 Improve how we handle recursive calls in ExplicitColumnTracker
and WildcardColumnTracker
+ HBASE-1537 Intra-row scanning (Backported by Andrey Stepachev via Stack)
NEW FEATURES
HBASE-2257 [stargate] multiuser mode
Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Thu Apr 1 22:11:23 2010
@@ -444,13 +444,16 @@ class TransactionState {
return next;
}
- public boolean next(List<KeyValue> results) throws IOException {
+ public boolean next(List<KeyValue> results, int limit) throws IOException {
KeyValue peek = this.peek();
if (peek == null) {
return false;
}
byte [] row = peek.getRow();
results.add(peek);
+ if (limit > 0 && (results.size() == limit)) {
+ return true;
+ }
while (true){
if (this.peek() == null) {
break;
@@ -459,10 +462,16 @@ class TransactionState {
break;
}
results.add(this.next());
+ if (limit > 0 && (results.size() == limit)) {
+ break;
+ }
}
- return true;
-
+ return true;
}
-
+
+ public boolean next(List<KeyValue> results) throws IOException {
+ return next(results, -1);
+ }
+
}
}
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java Thu Apr 1 22:11:23 2010
@@ -73,6 +73,9 @@ import org.apache.hadoop.io.WritableFact
* To limit the number of versions of each column to be returned, execute
* {@link #setMaxVersions(int) setMaxVersions}.
* <p>
+ * To limit the maximum number of values returned for each call to next(), execute
+ * {@link #setBatch(int) setBatch}.
+ * <p>
* To add a filter, execute {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
* <p>
* Expert: To explicitly disable server-side block caching for this scan,
@@ -91,6 +94,7 @@ public class Scan implements Writable {
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
+ private int batch = -1;
private int caching = -1;
private boolean cacheBlocks = true;
private Filter filter = null;
@@ -143,6 +147,7 @@ public class Scan implements Writable {
startRow = scan.getStartRow();
stopRow = scan.getStopRow();
maxVersions = scan.getMaxVersions();
+ batch = scan.getBatch();
caching = scan.getCaching();
cacheBlocks = scan.getCacheBlocks();
filter = scan.getFilter(); // clone?
@@ -351,6 +356,14 @@ public class Scan implements Writable {
}
/**
+ * Set the maximum number of values to return for each call to next()
+ * @param batch the maximum number of values
+ */
+ public void setBatch(int batch) {
+ this.batch = batch;
+ }
+
+ /**
* Set the number of rows for caching that will be passed to scanners.
* If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
* Higher caching values will enable faster scanners but will use more memory.
@@ -448,6 +461,13 @@ public class Scan implements Writable {
}
/**
+ * @return maximum number of values to return for a single call to next()
+ */
+ public int getBatch() {
+ return this.batch;
+ }
+
+ /**
* @return caching the number of rows fetched when calling next on a scanner
*/
public int getCaching() {
@@ -593,6 +613,8 @@ public class Scan implements Writable {
sb.append(Bytes.toString(this.stopRow));
sb.append(", maxVersions=");
sb.append("" + this.maxVersions);
+ sb.append(", batch=");
+ sb.append("" + this.batch);
sb.append(", caching=");
sb.append("" + this.caching);
sb.append(", cacheBlocks=");
@@ -673,6 +695,7 @@ public class Scan implements Writable {
}
this.stopRow = Bytes.readByteArray(in);
this.maxVersions = in.readInt();
+ this.batch = in.readInt();
this.caching = in.readInt();
this.cacheBlocks = in.readBoolean();
if(in.readBoolean()) {
@@ -718,6 +741,7 @@ public class Scan implements Writable {
Bytes.writeByteArray(out, this.startRow);
Bytes.writeByteArray(out, this.stopRow);
out.writeInt(this.maxVersions);
+ out.writeInt(this.batch);
out.writeInt(this.caching);
out.writeBoolean(this.cacheBlocks);
if(this.filter == null) {
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java Thu Apr 1 22:11:23 2010
@@ -444,7 +444,13 @@ abstract class BaseScanner extends Chore
private HRegionInfo getDaughterRegionInfo(final Result rowContent,
final byte [] which)
throws IOException {
- return Writables.getHRegionInfoOrNull(rowContent.getValue(CATALOG_FAMILY, which));
+ return getRegionInfo(rowContent, CATALOG_FAMILY, which);
+ }
+
+ private HRegionInfo getRegionInfo(final Result r, final byte [] f,
+ final byte [] q)
+ throws IOException {
+ return Writables.getHRegionInfoOrNull(r.getValue(f, q));
}
/*
@@ -530,9 +536,9 @@ abstract class BaseScanner extends Chore
final MetaRegion meta, final HRegionInfo info,
final String serverAddress, final long startCode)
throws IOException {
- String serverName = null;
String sa = serverAddress;
long sc = startCode;
+ HRegionInfo ri = info;
// Scans are sloppy. They don't respect row locks and they get and
// cache a row internally so may have data that is stale. Make sure that for
// sure we have the right server and servercode. We are trying to avoid
@@ -542,10 +548,13 @@ abstract class BaseScanner extends Chore
g.addFamily(HConstants.CATALOG_FAMILY);
Result r = regionServer.get(meta.getRegionName(), g);
if (r != null && !r.isEmpty()) {
+ // Result may not have serveraddress or startcode so below may be null.
sa = getServerAddress(r);
- // Reget startcode in case its changed in the meantime too.
sc = getStartCode(r);
+ ri = getRegionInfo(r, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
}
+ // If serverName remains null after below machinations, region is not assigned.
+ String serverName = null;
if (sa != null && sa.length() > 0) {
serverName = HServerInfo.getServerName(sa, sc);
}
@@ -555,8 +564,8 @@ abstract class BaseScanner extends Chore
* a dead server. Regions that were on a dead server will get reassigned
* by ProcessServerShutdown
*/
- if (info.isOffline() ||
- this.master.regionManager.regionIsInTransition(info.getRegionNameAsString()) ||
+ if (ri.isOffline() ||
+ this.master.regionManager.regionIsInTransition(ri.getRegionNameAsString()) ||
(serverName != null && this.master.serverManager.isDead(serverName))) {
return;
}
@@ -569,12 +578,12 @@ abstract class BaseScanner extends Chore
if (storedInfo == null) {
// The current assignment is invalid
if (LOG.isDebugEnabled()) {
- LOG.debug("Current assignment of " + info.getRegionNameAsString() +
+ LOG.debug("Current assignment of " + ri.getRegionNameAsString() +
" is not valid; " + " serverAddress=" + sa +
", startCode=" + sc + " unknown.");
}
// Now get the region assigned
- this.master.regionManager.setUnassigned(info, true);
+ this.master.regionManager.setUnassigned(ri, true);
}
}
}
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java Thu Apr 1 22:11:23 2010
@@ -526,6 +526,7 @@ class ServerManager implements HConstant
* @param hri Region to assign.
*/
private void assignSplitDaughter(final HRegionInfo hri) {
+ // if (this.master.regionManager.isPendingOpen(hri.getRegionNameAsString())) return;
MetaRegion mr = this.master.regionManager.getFirstMetaRegionForRegion(hri);
Get g = new Get(hri.getRegionName());
g.addFamily(HConstants.CATALOG_FAMILY);
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Apr 1 22:11:23 2010
@@ -1850,9 +1850,11 @@ public class HRegion implements HConstan
private Filter filter;
private RowFilterInterface oldFilter;
private List<KeyValue> results = new ArrayList<KeyValue>();
+ private int batch;
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
this.filter = scan.getFilter();
+ this.batch = scan.getBatch();
this.oldFilter = scan.getOldFilter();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
this.stopRow = null;
@@ -1889,14 +1891,14 @@ public class HRegion implements HConstan
}
}
- public boolean next(List<KeyValue> outResults) throws IOException {
+ public boolean next(List<KeyValue> outResults, int limit) throws IOException {
if (closing.get() || closed.get()) {
close();
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing=" + closing.get() + " or closed=" + closed.get());
}
results.clear();
- boolean returnResult = nextInternal();
+ boolean returnResult = nextInternal(limit);
if (!returnResult && filterRow()) {
results.clear();
}
@@ -1908,6 +1910,11 @@ public class HRegion implements HConstan
return returnResult;
}
+ public boolean next(List<KeyValue> outResults) throws IOException {
+ // apply the batching limit by default
+ return next(outResults, batch);
+ }
+
/*
* @return True if a filter rules the scanner is over, done.
*/
@@ -1922,7 +1929,7 @@ public class HRegion implements HConstan
* @return true if there are more rows, false if scanner is done
* @throws IOException
*/
- private boolean nextInternal() throws IOException {
+ private boolean nextInternal(int limit) throws IOException {
while (true) {
byte[] currentRow = peekRow();
if (isStopRow(currentRow)) {
@@ -1932,7 +1939,10 @@ public class HRegion implements HConstan
} else {
byte[] nextRow;
do {
- this.storeHeap.next(results);
+ this.storeHeap.next(results, limit);
+ if (limit > 0 && results.size() == limit) {
+ return true;
+ }
} while (Bytes.equals(currentRow, nextRow = peekRow()));
final boolean stopRow = isStopRow(nextRow);
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Thu Apr 1 22:11:23 2010
@@ -46,12 +46,21 @@ public interface InternalScanner extends
* @return true if more rows exist after this one, false if scanner is done
* @throws IOException
*/
- public boolean next(List<KeyValue> results)
- throws IOException;
-
+ public boolean next(List<KeyValue> results) throws IOException;
+
+ /**
+ * Grab the next row's worth of values with a limit on the number of values
+ * to return.
+ * @param results
+ * @param limit
+ * @return true if more rows exist after this one, false if scanner is done
+ * @throws IOException
+ */
+ public boolean next(List<KeyValue> result, int limit) throws IOException;
+
/**
* Closes the scanner and releases any resources it has allocated
* @throws IOException
*/
public void close() throws IOException;
-}
\ No newline at end of file
+}
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Thu Apr 1 22:11:23 2010
@@ -99,11 +99,13 @@ public class KeyValueHeap implements Key
* <p>
* This can ONLY be called when you are using Scanners that implement
* InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+ * @param result
+ * @param limit
* @return true if there are more keys, false if all scanners are done
*/
- public boolean next(List<KeyValue> result) throws IOException {
+ public boolean next(List<KeyValue> result, int limit) throws IOException {
InternalScanner currentAsInternal = (InternalScanner)this.current;
- currentAsInternal.next(result);
+ currentAsInternal.next(result, limit);
KeyValue pee = this.current.peek();
if (pee == null) {
this.current.close();
@@ -113,7 +115,21 @@ public class KeyValueHeap implements Key
this.current = this.heap.poll();
return (this.current != null);
}
-
+
+ /**
+ * Gets the next row of keys from the top-most scanner.
+ * <p>
+ * This method takes care of updating the heap.
+ * <p>
+ * This can ONLY be called when you are using Scanners that implement
+ * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+ * @param result
+ * @return true if there are more keys, false if all scanners are done
+ */
+ public boolean next(List<KeyValue> result) throws IOException {
+ return next(result, -1);
+ }
+
private class KVScannerComparator implements Comparator<KeyValueScanner> {
private KVComparator kvComparator;
/**
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java Thu Apr 1 22:11:23 2010
@@ -170,6 +170,12 @@ public class MinorCompactingStoreScanner
return false;
}
+ @Override
+ public boolean next(List<KeyValue> results, int limit) throws IOException {
+ // should not use limits with minor compacting store scanner
+ return next(results);
+ }
+
public void close() {
heap.close();
}
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Apr 1 22:11:23 2010
@@ -146,9 +146,10 @@ class StoreScanner implements KeyValueSc
/**
* Get the next row of values from this Store.
* @param result
+ * @param limit
* @return true if there are more rows, false if scanner is done
*/
- public synchronized boolean next(List<KeyValue> outResult) throws IOException {
+ public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
@@ -157,14 +158,17 @@ class StoreScanner implements KeyValueSc
matcher.setRow(peeked.getRow());
KeyValue kv;
List<KeyValue> results = new ArrayList<KeyValue>();
- while((kv = this.heap.peek()) != null) {
+ LOOP: while((kv = this.heap.peek()) != null) {
QueryMatcher.MatchCode qcode = matcher.match(kv);
switch(qcode) {
case INCLUDE:
KeyValue next = this.heap.next();
results.add(next);
+ if (limit > 0 && (results.size() == limit)) {
+ break LOOP;
+ }
continue;
-
+
case DONE:
// copy jazz
outResult.addAll(results);
@@ -209,6 +213,10 @@ class StoreScanner implements KeyValueSc
return false;
}
+ public synchronized boolean next(List<KeyValue> outResult) throws IOException {
+ return next(outResult, -1);
+ }
+
private List<KeyValueScanner> getStoreFileScanners() {
List<HFileScanner> s =
new ArrayList<HFileScanner>(this.store.getStorefilesCount());
Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java Thu Apr 1 22:11:23 2010
@@ -1522,6 +1522,15 @@ public class TestClient extends HBaseClu
put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
ht.put(put);
+
+ // Assert that above went in.
+ get = new Get(ROWS[2]);
+ get.addFamily(FAMILIES[1]);
+ get.addFamily(FAMILIES[2]);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ result = ht.get(get);
+ assertTrue("Expected 4 key but received " + result.size() + ": " + result,
+ result.size() == 4);
delete = new Delete(ROWS[0]);
delete.deleteFamily(FAMILIES[2]);
@@ -1582,7 +1591,7 @@ public class TestClient extends HBaseClu
get.addFamily(FAMILIES[2]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
- assertTrue("Expected 1 key but received " + result.size(),
+ assertTrue("Expected 1 key but received " + result.size() + ": " + result,
result.size() == 1);
assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long [] {ts[2]},
Added: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java?rev=930112&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java (added)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java Thu Apr 1 22:11:23 2010
@@ -0,0 +1,126 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class TestWideScanner extends HBaseTestCase {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+
+ final int BATCH = 1000;
+
+ private MiniDFSCluster cluster = null;
+ private HRegion r;
+
+ static final HTableDescriptor TESTTABLEDESC =
+ new HTableDescriptor("testwidescan");
+ static {
+ TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
+ 10, // Ten is arbitrary number. Keep versions to help debuggging.
+ Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
+ HConstants.FOREVER, false));
+ }
+ /** HRegionInfo for root region */
+ public static final HRegionInfo REGION_INFO =
+ new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY);
+
+ @Override
+ public void setUp() throws Exception {
+ cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
+ // Set the hbase.rootdir to be the home directory in mini dfs.
+ this.conf.set(HConstants.HBASE_DIR,
+ this.cluster.getFileSystem().getHomeDirectory().toString());
+ super.setUp();
+ }
+
+ private int addWideContent(HRegion region, byte[] family)
+ throws IOException {
+ int count = 0;
+ // add a few rows of 2500 columns (we'll use batch of 1000) to make things
+ // interesting
+ for (char c = 'a'; c <= 'c'; c++) {
+ byte[] row = Bytes.toBytes("ab" + c);
+ int i;
+ for (i = 0; i < 2500; i++) {
+ byte[] b = Bytes.toBytes(String.format("%10d", i));
+ Put put = new Put(row);
+ put.add(family, b, b);
+ region.put(put);
+ count++;
+ }
+ }
+ // add one row of 100,000 columns
+ {
+ byte[] row = Bytes.toBytes("abf");
+ int i;
+ for (i = 0; i < 100000; i++) {
+ byte[] b = Bytes.toBytes(String.format("%10d", i));
+ Put put = new Put(row);
+ put.add(family, b, b);
+ region.put(put);
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public void testWideScanBatching() throws IOException {
+ try {
+ this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
+ int inserted = addWideContent(this.r, HConstants.CATALOG_FAMILY);
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ Scan scan = new Scan();
+ scan.addFamily(HConstants.CATALOG_FAMILY);
+ scan.setBatch(BATCH);
+ InternalScanner s = r.getScanner(scan);
+ int total = 0;
+ int i = 0;
+ boolean more;
+ do {
+ more = s.next(results);
+ i++;
+ LOG.info("iteration #" + i + ", results.size=" + results.size());
+
+ // assert that the result set is no larger than BATCH
+ assertTrue(results.size() <= BATCH);
+
+ total += results.size();
+
+ if (results.size() > 0) {
+ // assert that all results are from the same row
+ byte[] row = results.get(0).getRow();
+ for (KeyValue kv: results) {
+ assertTrue(Bytes.equals(row, kv.getRow()));
+ }
+ }
+
+ results.clear();
+ } while (more);
+
+ // assert that the scanner returned all values
+ LOG.info("inserted " + inserted + ", scanned " + total);
+ assertTrue(total == inserted);
+
+ s.close();
+ } finally {
+ this.r.close();
+ this.r.getLog().closeAndDelete();
+ shutdownDfs(this.cluster);
+ }
+ }
+}