You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/05/15 02:28:24 UTC

svn commit: r944534 - /hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Author: rawson
Date: Sat May 15 00:28:24 2010
New Revision: 944534

URL: http://svn.apache.org/viewvc?rev=944534&view=rev
Log:
Make TestHRegion pass by porting the RegionScanner implementation from 0.20

Modified:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=944534&r1=944533&r2=944534&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat May 15 00:28:24 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -54,27 +54,25 @@ import org.apache.hadoop.hbase.util.Writ
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.lang.reflect.Constructor;
- import java.util.AbstractList;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.NavigableSet;
- import java.util.TreeMap;
- import java.util.TreeSet;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Random;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ConcurrentSkipListMap;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
@@ -1937,7 +1935,6 @@ public class HRegion implements HConstan
       //DebugPrint.println("HRegionScanner.<init>");
       this.filter = scan.getFilter();
       this.batch = scan.getBatch();
-
       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
         this.stopRow = null;
       } else {
@@ -1969,7 +1966,6 @@ public class HRegion implements HConstan
         new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
     }
 
-
     private void resetFilters() {
       if (filter != null) {
         filter.reset();
@@ -2025,70 +2021,64 @@ public class HRegion implements HConstan
       return this.filter != null && this.filter.filterAllRemaining();
     }
 
-    /*
-     * @return true if there are more rows, false if scanner is done
-     * @throws IOException
-     */
     private boolean nextInternal(int limit) throws IOException {
-      byte [] currentRow = null;
-      boolean filterCurrentRow = false;
       while (true) {
-        KeyValue kv = this.storeHeap.peek();
-        if (kv == null) return false;
-        byte [] row = kv.getRow();
-        boolean samerow = Bytes.equals(currentRow, row);
-        if (samerow && filterCurrentRow) {
-          // Filter all columns until row changes
-          readAndDumpCurrentResult();
-          continue;
-        }
-        if (!samerow) {
-          // Continue on the next row:
-          currentRow = row;
-          filterCurrentRow = false;
-          // See if we passed stopRow
-          if (this.stopRow != null &&
-              comparator.compareRows(this.stopRow, 0, this.stopRow.length, 
-                currentRow, 0, currentRow.length) <= this.isScan) {
-            return false;
+        byte [] currentRow = peekRow();
+        if (isStopRow(currentRow)) {
+          return false;
+        } else if (filterRowKey(currentRow)) {
+          nextRow(currentRow);
+        } else {
+          byte [] nextRow;
+          do {
+            this.storeHeap.next(results, limit);
+            if (limit > 0 && results.size() == limit) {
+              return true;
+            }
+          } while (Bytes.equals(currentRow, nextRow = peekRow()));
+
+          final boolean stopRow = isStopRow(nextRow);
+          if (!stopRow && (results.isEmpty() || filterRow())) {
+            // this seems like a redundant step - we already consumed the row
+            // there're no left overs.
+            // the reasons for calling this method are:
+            // 1. reset the filters.
+            // 2. provide a hook to fast forward the row (used by subclasses)
+            nextRow(currentRow);
+            continue;
           }
-          if (hasResults()) return true;
-        }
-        // See if current row should be filtered based on row key
-        if (this.filter != null && this.filter.filterRowKey(row, 0, row.length)) {
-          readAndDumpCurrentResult();
-          resetFilters();
-          filterCurrentRow = true;
-          currentRow = row;
-          continue;
-        }
-        this.storeHeap.next(results, limit);
-        if (limit > 0 && results.size() == limit) {
-          return true;
+          return !stopRow;
         }
       }
     }
 
-    private void readAndDumpCurrentResult() throws IOException {
-      this.storeHeap.next(this.results);
-      this.results.clear();
+    private boolean filterRow() {
+      return filter != null
+          && filter.filterRow();
+    }
+    private boolean filterRowKey(byte[] row) {
+      return filter != null
+          && filter.filterRowKey(row, 0, row.length);
     }
 
-    /*
-     * Do we have results to return or should we continue.  Call when we get to
-     * the end of a row.  Does house cleaning -- clearing results and resetting
-     * filters -- if we are to continue.
-     * @return True if we should return else false if need to keep going.
-     */
-    private boolean hasResults() {
-      if (this.results.isEmpty() ||
-          this.filter != null && this.filter.filterRow()) {
-        // Make sure results is empty, reset filters
-        this.results.clear();
-        resetFilters();
-        return false;
+    protected void nextRow(byte [] currentRow) throws IOException {
+      while (Bytes.equals(currentRow, peekRow())) {
+        this.storeHeap.next(MOCKED_LIST);
       }
-      return true;
+      results.clear();
+      resetFilters();
+    }
+
+    private byte[] peekRow() {
+      KeyValue kv = this.storeHeap.peek();
+      return kv == null ? null : kv.getRow();
+    }
+
+    private boolean isStopRow(byte [] currentRow) {
+      return currentRow == null ||
+          (stopRow != null &&
+          comparator.compareRows(stopRow, 0, stopRow.length,
+              currentRow, 0, currentRow.length) <= isScan);
     }
 
     public synchronized void close() {
@@ -2096,14 +2086,7 @@ public class HRegion implements HConstan
         storeHeap.close();
         storeHeap = null;
       }
-	  this.filterClosed = true;
-    }
-
-    /**
-     * @return the current storeHeap
-     */
-    public synchronized KeyValueHeap getStoreHeap() {
-      return this.storeHeap;
+      this.filterClosed = true;
     }
   }
 
@@ -2829,6 +2812,33 @@ public class HRegion implements HConstan
   }
 
   /**
+   * A mocked list implementaion - discards all updates.
+   */
+  private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
+
+    @Override
+    public void add(int index, KeyValue element) {
+      // do nothing
+    }
+
+    @Override
+    public boolean addAll(int index, Collection<? extends KeyValue> c) {
+      return false; // this list is never changed as a result of an update
+    }
+
+    @Override
+    public KeyValue get(int index) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int size() {
+      return 0;
+    }
+  };
+
+
+  /**
    * Facility for dumping and compacting catalog tables.
    * Only does catalog tables since these are only tables we for sure know
    * schema on.  For usage run: