You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/04/02 00:11:24 UTC

svn commit: r930112 - in /hadoop/hbase/branches/0.20: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop...

Author: stack
Date: Thu Apr  1 22:11:23 2010
New Revision: 930112

URL: http://svn.apache.org/viewvc?rev=930112&view=rev
Log:
HBASE-1537 Intra-row scanning

Added:
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Thu Apr  1 22:11:23 2010
@@ -102,6 +102,7 @@ Release 0.20.4 - Unreleased
    HBASE-2388  Give a very explicit message when we figure a big GC pause
    HBASE-2270  Improve how we handle recursive calls in ExplicitColumnTracker 
                and WildcardColumnTracker
+   HBASE-1537  Intra-row scanning (Backported by Andrey Stepachev via Stack)
 
   NEW FEATURES
    HBASE-2257  [stargate] multiuser mode

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Thu Apr  1 22:11:23 2010
@@ -444,13 +444,16 @@ class TransactionState {
       return next;
     }
 
-    public boolean next(List<KeyValue> results) throws IOException {
+    public boolean next(List<KeyValue> results, int limit) throws IOException {
         KeyValue peek = this.peek();
         if (peek == null) {
           return false;
         }
         byte [] row = peek.getRow();
         results.add(peek);
+        if (limit > 0 && (results.size() == limit)) {
+          return true;
+        }
         while (true){
           if (this.peek() == null) {
             break;
@@ -459,10 +462,16 @@ class TransactionState {
             break;
           }
           results.add(this.next());
+          if (limit > 0 && (results.size() == limit)) {
+            break;
+          }
         }
-        return true;
-        
+        return true;        
     }
-    
+
+    public boolean next(List<KeyValue> results) throws IOException {
+      return next(results, -1);
+    }
+
    }
 }

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java Thu Apr  1 22:11:23 2010
@@ -73,6 +73,9 @@ import org.apache.hadoop.io.WritableFact
  * To limit the number of versions of each column to be returned, execute
  * {@link #setMaxVersions(int) setMaxVersions}.
  * <p>
+ * To limit the maximum number of values returned for each call to next(), execute 
+ * {@link #setBatch(int) setBatch}.
+ * <p>
  * To add a filter, execute {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
  * <p>
  * Expert: To explicitly disable server-side block caching for this scan, 
@@ -91,6 +94,7 @@ public class Scan implements Writable {
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
+  private int batch = -1;
   private int caching = -1;
   private boolean cacheBlocks = true;
   private Filter filter = null;
@@ -143,6 +147,7 @@ public class Scan implements Writable {
     startRow = scan.getStartRow();
     stopRow  = scan.getStopRow();
     maxVersions = scan.getMaxVersions();
+    batch = scan.getBatch();
     caching = scan.getCaching();
     cacheBlocks = scan.getCacheBlocks();
     filter = scan.getFilter(); // clone?
@@ -351,6 +356,14 @@ public class Scan implements Writable {
   }
 
   /**
+   * Set the maximum number of values to return for each call to next()
+   * @param batch the maximum number of values
+   */
+  public void setBatch(int batch) {
+    this.batch = batch;
+  }
+
+  /**
    * Set the number of rows for caching that will be passed to scanners.
    * If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
    * Higher caching values will enable faster scanners but will use more memory.
@@ -448,6 +461,13 @@ public class Scan implements Writable {
   } 
 
   /**
+   * @return maximum number of values to return for a single call to next()
+   */
+  public int getBatch() {
+    return this.batch;
+  }
+
+  /**
    * @return caching the number of rows fetched when calling next on a scanner
    */
   public int getCaching() {
@@ -593,6 +613,8 @@ public class Scan implements Writable {
     sb.append(Bytes.toString(this.stopRow));
     sb.append(", maxVersions=");
     sb.append("" + this.maxVersions);
+    sb.append(", batch=");
+    sb.append("" + this.batch);
     sb.append(", caching=");
     sb.append("" + this.caching);
     sb.append(", cacheBlocks=");
@@ -673,6 +695,7 @@ public class Scan implements Writable {
     }
     this.stopRow = Bytes.readByteArray(in);
     this.maxVersions = in.readInt();
+    this.batch = in.readInt();
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -718,6 +741,7 @@ public class Scan implements Writable {
     Bytes.writeByteArray(out, this.startRow);
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
+    out.writeInt(this.batch);
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/BaseScanner.java Thu Apr  1 22:11:23 2010
@@ -444,7 +444,13 @@ abstract class BaseScanner extends Chore
   private HRegionInfo getDaughterRegionInfo(final Result rowContent,
     final byte [] which)
   throws IOException {
-    return Writables.getHRegionInfoOrNull(rowContent.getValue(CATALOG_FAMILY, which));
+    return getRegionInfo(rowContent, CATALOG_FAMILY, which);
+  }
+
+  private HRegionInfo getRegionInfo(final Result r, final byte [] f,
+    final byte [] q)
+  throws IOException {
+    return Writables.getHRegionInfoOrNull(r.getValue(f, q));
   }
 
   /*
@@ -530,9 +536,9 @@ abstract class BaseScanner extends Chore
     final MetaRegion meta, final HRegionInfo info,
     final String serverAddress, final long startCode) 
   throws IOException {
-    String serverName = null;
     String sa = serverAddress;
     long sc = startCode;
+    HRegionInfo ri = info;
     // Scans are sloppy. They don't respect row locks and they get and
     // cache a row internally so may have data that is stale. Make sure that for
     // sure we have the right server and servercode. We are trying to avoid
@@ -542,10 +548,13 @@ abstract class BaseScanner extends Chore
     g.addFamily(HConstants.CATALOG_FAMILY);
     Result r = regionServer.get(meta.getRegionName(), g);
     if (r != null && !r.isEmpty()) {
+      // Result may not have serveraddress or startcode so below may be null.
       sa = getServerAddress(r);
-      // Reget startcode in case its changed in the meantime too.
       sc = getStartCode(r);
+      ri = getRegionInfo(r, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
     }
+    // If serverName remains null after below machinations, region is not assigned.
+    String serverName = null;
     if (sa != null && sa.length() > 0) {
       serverName = HServerInfo.getServerName(sa, sc);
     }
@@ -555,8 +564,8 @@ abstract class BaseScanner extends Chore
        * a dead server. Regions that were on a dead server will get reassigned
        * by ProcessServerShutdown
        */
-      if (info.isOffline() ||
-        this.master.regionManager.regionIsInTransition(info.getRegionNameAsString()) ||
+      if (ri.isOffline() ||
+        this.master.regionManager.regionIsInTransition(ri.getRegionNameAsString()) ||
           (serverName != null && this.master.serverManager.isDead(serverName))) {
         return;
       }
@@ -569,12 +578,12 @@ abstract class BaseScanner extends Chore
       if (storedInfo == null) {
         // The current assignment is invalid
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Current assignment of " + info.getRegionNameAsString() +
+          LOG.debug("Current assignment of " + ri.getRegionNameAsString() +
             " is not valid; " + " serverAddress=" + sa +
             ", startCode=" + sc + " unknown.");
         }
         // Now get the region assigned
-        this.master.regionManager.setUnassigned(info, true);
+        this.master.regionManager.setUnassigned(ri, true);
       }
     }
   }

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java Thu Apr  1 22:11:23 2010
@@ -526,6 +526,7 @@ class ServerManager implements HConstant
    * @param hri Region to assign.
    */
   private void assignSplitDaughter(final HRegionInfo hri) {
+    // if (this.master.regionManager.isPendingOpen(hri.getRegionNameAsString())) return;
     MetaRegion mr = this.master.regionManager.getFirstMetaRegionForRegion(hri);
     Get g = new Get(hri.getRegionName());
     g.addFamily(HConstants.CATALOG_FAMILY);

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Apr  1 22:11:23 2010
@@ -1850,9 +1850,11 @@ public class HRegion implements HConstan
     private Filter filter;
     private RowFilterInterface oldFilter;
     private List<KeyValue> results = new ArrayList<KeyValue>();
+    private int batch;
 
     RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
       this.filter = scan.getFilter();
+      this.batch = scan.getBatch();
       this.oldFilter = scan.getOldFilter();
       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
         this.stopRow = null;
@@ -1889,14 +1891,14 @@ public class HRegion implements HConstan
       }
     }
 
-    public boolean next(List<KeyValue> outResults) throws IOException {
+    public boolean next(List<KeyValue> outResults, int limit) throws IOException {
       if (closing.get() || closed.get()) {
         close();
         throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing=" + closing.get() + " or closed=" + closed.get());
       }
       results.clear();
-      boolean returnResult = nextInternal();
+      boolean returnResult = nextInternal(limit);
       if (!returnResult && filterRow()) {
         results.clear();
       }
@@ -1908,6 +1910,11 @@ public class HRegion implements HConstan
       return returnResult;
     }
 
+    public boolean next(List<KeyValue> outResults) throws IOException {
+      // apply the batching limit by default
+      return next(outResults, batch);
+    }
+
     /*
      * @return True if a filter rules the scanner is over, done.
      */
@@ -1922,7 +1929,7 @@ public class HRegion implements HConstan
     * @return true if there are more rows, false if scanner is done
     * @throws IOException
     */
-    private boolean nextInternal() throws IOException {
+    private boolean nextInternal(int limit) throws IOException {
       while (true) {
         byte[] currentRow = peekRow();
         if (isStopRow(currentRow)) {
@@ -1932,7 +1939,10 @@ public class HRegion implements HConstan
         } else {
           byte[] nextRow;
           do {
-            this.storeHeap.next(results);
+            this.storeHeap.next(results, limit);
+            if (limit > 0 && results.size() == limit) {
+              return true;
+            }
           } while (Bytes.equals(currentRow, nextRow = peekRow()));
 
           final boolean stopRow = isStopRow(nextRow);

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Thu Apr  1 22:11:23 2010
@@ -46,12 +46,21 @@ public interface InternalScanner extends
    * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException
    */
-  public boolean next(List<KeyValue> results)
-  throws IOException;
-  
+  public boolean next(List<KeyValue> results) throws IOException;
+
+  /**
+   * Grab the next row's worth of values with a limit on the number of values
+   * to return. 
+   * @param results
+   * @param limit
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException
+   */
+  public boolean next(List<KeyValue> result, int limit) throws IOException;
+
   /**
    * Closes the scanner and releases any resources it has allocated
    * @throws IOException
    */
   public void close() throws IOException;
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Thu Apr  1 22:11:23 2010
@@ -99,11 +99,13 @@ public class KeyValueHeap implements Key
    * <p>
    * This can ONLY be called when you are using Scanners that implement
    * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * @param result
+   * @param limit
    * @return true if there are more keys, false if all scanners are done 
    */
-  public boolean next(List<KeyValue> result) throws IOException {
+  public boolean next(List<KeyValue> result, int limit) throws IOException {
     InternalScanner currentAsInternal = (InternalScanner)this.current;
-    currentAsInternal.next(result);
+    currentAsInternal.next(result, limit);
     KeyValue pee = this.current.peek();
     if (pee == null) {
       this.current.close();
@@ -113,7 +115,21 @@ public class KeyValueHeap implements Key
     this.current = this.heap.poll();
     return (this.current != null);
   }
-  
+
+  /**
+   * Gets the next row of keys from the top-most scanner.
+   * <p>
+   * This method takes care of updating the heap.
+   * <p>
+   * This can ONLY be called when you are using Scanners that implement
+   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * @param result
+   * @return true if there are more keys, false if all scanners are done 
+   */
+  public boolean next(List<KeyValue> result) throws IOException {
+    return next(result, -1);
+  }
+
   private class KVScannerComparator implements Comparator<KeyValueScanner> {
     private KVComparator kvComparator;
     /**

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java Thu Apr  1 22:11:23 2010
@@ -170,6 +170,12 @@ public class MinorCompactingStoreScanner
     return false;
   }
 
+  @Override
+  public boolean next(List<KeyValue> results, int limit) throws IOException {
+    // should not use limits with minor compacting store scanner
+    return next(results);
+  }
+
   public void close() {
     heap.close();
   }

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Apr  1 22:11:23 2010
@@ -146,9 +146,10 @@ class StoreScanner implements KeyValueSc
   /**
    * Get the next row of values from this Store.
    * @param result
+   * @param limit
    * @return true if there are more rows, false if scanner is done
    */
-  public synchronized boolean next(List<KeyValue> outResult) throws IOException {
+  public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
     KeyValue peeked = this.heap.peek();
     if (peeked == null) {
       close();
@@ -157,14 +158,17 @@ class StoreScanner implements KeyValueSc
     matcher.setRow(peeked.getRow());
     KeyValue kv;
     List<KeyValue> results = new ArrayList<KeyValue>();
-    while((kv = this.heap.peek()) != null) {
+    LOOP: while((kv = this.heap.peek()) != null) {
       QueryMatcher.MatchCode qcode = matcher.match(kv);
       switch(qcode) {
         case INCLUDE:
           KeyValue next = this.heap.next();
           results.add(next);
+          if (limit > 0 && (results.size() == limit)) {
+            break LOOP;
+          }
           continue;
-          
+
         case DONE:
           // copy jazz
           outResult.addAll(results);
@@ -209,6 +213,10 @@ class StoreScanner implements KeyValueSc
     return false;
   }
 
+  public synchronized boolean next(List<KeyValue> outResult) throws IOException {
+    return next(outResult, -1);
+  }
+
   private List<KeyValueScanner> getStoreFileScanners() {
     List<HFileScanner> s =
       new ArrayList<HFileScanner>(this.store.getStorefilesCount());

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java?rev=930112&r1=930111&r2=930112&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java Thu Apr  1 22:11:23 2010
@@ -1522,6 +1522,15 @@ public class TestClient extends HBaseClu
     put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
     put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
     ht.put(put);
+
+    // Assert that above went in.
+    get = new Get(ROWS[2]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 4 key but received " + result.size() + ": " + result,
+        result.size() == 4);
     
     delete = new Delete(ROWS[0]);
     delete.deleteFamily(FAMILIES[2]);
@@ -1582,7 +1591,7 @@ public class TestClient extends HBaseClu
     get.addFamily(FAMILIES[2]);
     get.setMaxVersions(Integer.MAX_VALUE);
     result = ht.get(get);
-    assertTrue("Expected 1 key but received " + result.size(),
+    assertTrue("Expected 1 key but received " + result.size() + ": " + result,
         result.size() == 1);
     assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, 
         new long [] {ts[2]},

Added: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java?rev=930112&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java (added)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java Thu Apr  1 22:11:23 2010
@@ -0,0 +1,126 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class TestWideScanner extends HBaseTestCase {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+
+  final int BATCH = 1000;
+
+  private MiniDFSCluster cluster = null;
+  private HRegion r;
+
+  static final HTableDescriptor TESTTABLEDESC =
+    new HTableDescriptor("testwidescan");
+  static {
+    TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
+      10,  // Ten is arbitrary number.  Keep versions to help debuggging.
+      Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
+      HConstants.FOREVER, false));
+  }
+  /** HRegionInfo for root region */
+  public static final HRegionInfo REGION_INFO =
+    new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
+    HConstants.EMPTY_BYTE_ARRAY);
+
+  @Override
+  public void setUp() throws Exception {
+    cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
+    // Set the hbase.rootdir to be the home directory in mini dfs.
+    this.conf.set(HConstants.HBASE_DIR,
+      this.cluster.getFileSystem().getHomeDirectory().toString());
+    super.setUp();    
+  }
+
+  private int addWideContent(HRegion region, byte[] family) 
+      throws IOException {
+    int count = 0;
+    // add a few rows of 2500 columns (we'll use batch of 1000) to make things
+    // interesting
+    for (char c = 'a'; c <= 'c'; c++) {
+      byte[] row = Bytes.toBytes("ab" + c);
+      int i;
+      for (i = 0; i < 2500; i++) {
+        byte[] b = Bytes.toBytes(String.format("%10d", i));
+        Put put = new Put(row);
+        put.add(family, b, b);
+        region.put(put);
+        count++;
+      }
+    }
+    // add one row of 100,000 columns
+    {
+      byte[] row = Bytes.toBytes("abf");
+      int i;
+      for (i = 0; i < 100000; i++) {
+        byte[] b = Bytes.toBytes(String.format("%10d", i));
+        Put put = new Put(row);
+        put.add(family, b, b);
+        region.put(put);
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public void testWideScanBatching() throws IOException {
+    try {
+      this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
+      int inserted = addWideContent(this.r, HConstants.CATALOG_FAMILY);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      Scan scan = new Scan();
+      scan.addFamily(HConstants.CATALOG_FAMILY);
+      scan.setBatch(BATCH);
+      InternalScanner s = r.getScanner(scan);
+      int total = 0;
+      int i = 0;
+      boolean more;
+      do {
+        more = s.next(results);
+        i++;
+        LOG.info("iteration #" + i + ", results.size=" + results.size());
+
+        // assert that the result set is no larger than BATCH
+        assertTrue(results.size() <= BATCH);
+
+        total += results.size();
+
+        if (results.size() > 0) {
+          // assert that all results are from the same row
+          byte[] row = results.get(0).getRow();
+          for (KeyValue kv: results) {
+            assertTrue(Bytes.equals(row, kv.getRow()));
+          }
+        }
+
+        results.clear();
+      } while (more);
+
+      // assert that the scanner returned all values
+      LOG.info("inserted " + inserted + ", scanned " + total);
+      assertTrue(total == inserted);
+
+      s.close();
+    } finally {
+      this.r.close();
+      this.r.getLog().closeAndDelete();
+      shutdownDfs(this.cluster);
+    }
+  }
+}