You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2009/09/03 01:23:23 UTC

svn commit: r810732 [1/2] - in /hadoop/hbase/branches/0.20: ./ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/filter/

Author: jgray
Date: Wed Sep  2 23:23:21 2009
New Revision: 810732

URL: http://svn.apache.org/viewvc?rev=810732&view=rev
Log:
HBASE-1790  filters are not working correctly (HBASE-1710 HBASE-1807 too)

Added:
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/BinaryComparator.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/CompareFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/QualifierFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RowFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SkipFilter.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilter.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestSingleColumnValueFilter.java
Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PageFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RegexStringComparator.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/ValueFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilterList.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestInclusiveStopFilter.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestPageFilter.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestValueFilter.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Wed Sep  2 23:23:21 2009
@@ -340,6 +340,7 @@
                (Mathias Herberts via Stack)
    HBASE-1804  Puts are permitted (and stored) when including an appended colon
    HBASE-1715  Compaction failure in ScanWildcardColumnTracker.checkColumn
+   HBASE-1790  filters are not working correctly (HBASE-1710 HBASE-1807 too)
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/BinaryComparator.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/BinaryComparator.java?rev=810732&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/BinaryComparator.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/BinaryComparator.java Wed Sep  2 23:23:21 2009
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A binary comparator which lexicographically compares against the specified 
+ * byte array using {@link Bytes#compareTo(byte[], byte[])}.
+ */
+public class BinaryComparator implements WritableByteArrayComparable {
+  
+  private byte [] value;
+
+  /**
+   *  Writable constructor, do not use.
+   */
+  public BinaryComparator() {
+  }
+
+  /**
+   * Constructor.
+   * @param value the value to compare against
+   */
+  public BinaryComparator(byte [] value) {
+    this.value = value;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    value = Bytes.readByteArray(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, value);
+  }
+
+  @Override
+  public int compareTo(byte [] value) {
+    return Bytes.compareTo(this.value, value);
+  }
+}

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/CompareFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/CompareFilter.java?rev=810732&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/CompareFilter.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/CompareFilter.java Wed Sep  2 23:23:21 2009
@@ -0,0 +1,141 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.ObjectWritable;
+
+/**
+ * This is a generic filter to be used to filter by comparison.  It takes an 
+ * operator (equal, greater, not equal, etc) and a byte [] comparator.
+ * <p>
+ * To filter by row key, use {@link RowFilter}.
+ * <p>
+ * To filter by column qualifier, use {@link QualifierFilter}.
+ * <p>
+ * To filter by value, use {@link SingleColumnValueFilter}.
+ * <p>
+ * These filters can be wrapped with {@link SkipFilter} and {@link WhileMatchFilter}
+ * to add more control.
+ * <p>
+ * Multiple filters can be combined using {@link FilterList}.
+ */
+public abstract class CompareFilter implements Filter {
+
+  /** Comparison operators. */
+  public enum CompareOp {
+    /** less than */
+    LESS,
+    /** less than or equal to */
+    LESS_OR_EQUAL,
+    /** equals */
+    EQUAL,
+    /** not equal */
+    NOT_EQUAL,
+    /** greater than or equal to */
+    GREATER_OR_EQUAL,
+    /** greater than */
+    GREATER;
+  }
+  
+  protected CompareOp compareOp;
+  protected WritableByteArrayComparable comparator;
+
+  /**
+   * Writable constructor, do not use.
+   */
+  public CompareFilter() {
+  }
+
+  /**
+   * Constructor.
+   * @param rowCompareOp the compare op for row matching
+   * @param rowComparator the comparator for row matching
+   */
+  public CompareFilter(final CompareOp compareOp, 
+      final WritableByteArrayComparable comparator) {
+    this.compareOp = compareOp;
+    this.comparator = comparator;
+  }
+
+  public void reset() {
+  }
+
+  public ReturnCode filterKeyValue(KeyValue v) {
+    return ReturnCode.INCLUDE;
+  }
+  
+  public boolean filterRowKey(byte[] data, int offset, int length) {
+    return false;
+  }
+
+  public boolean filterRow() {
+    return false;
+  }
+  
+  public boolean filterAllRemaining() {
+    return false;
+  }
+
+  protected boolean doCompare(final CompareOp compareOp,
+      final WritableByteArrayComparable comparator, final byte [] data,
+      final int offset, final int length) {
+    int compareResult = 
+      comparator.compareTo(Arrays.copyOfRange(data, offset, 
+        offset + length));
+    switch (compareOp) {
+      case LESS:
+        return compareResult <= 0;
+      case LESS_OR_EQUAL:
+        return compareResult < 0;
+      case EQUAL:
+        return compareResult != 0;
+      case NOT_EQUAL:
+        return compareResult == 0;
+      case GREATER_OR_EQUAL:
+        return compareResult > 0;
+      case GREATER:
+        return compareResult >= 0;
+      default:
+        throw new RuntimeException("Unknown Compare op " +
+          compareOp.name());
+    }
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    compareOp = CompareOp.valueOf(in.readUTF());
+    comparator = (WritableByteArrayComparable)
+        HbaseObjectWritable.readObject(in, null);
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(compareOp.name());
+    ObjectWritable.writeObject(out, comparator,
+        WritableByteArrayComparable.class, null);
+  }
+}

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java Wed Sep  2 23:23:21 2009
@@ -35,6 +35,7 @@
  */
 public class InclusiveStopFilter implements Filter {
   private byte [] stopRowKey;
+  private boolean done = false;
 
   public InclusiveStopFilter() {
     super();
@@ -56,12 +57,17 @@
       return false;
     }
     // if stopRowKey is <= buffer, then true, filter row.
-    return Bytes.compareTo(stopRowKey, 0, stopRowKey.length,
-      buffer, offset, length) < 0;
+    int cmp = Bytes.compareTo(stopRowKey, 0, stopRowKey.length,
+      buffer, offset, length);
+    
+    if(cmp < 0) {
+      done = true;
+    }
+    return done;
   }
 
   public boolean filterAllRemaining() {
-    return false;
+    return done;
   }
 
   public ReturnCode filterKeyValue(KeyValue v) {

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PageFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PageFilter.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PageFilter.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PageFilter.java Wed Sep  2 23:23:21 2009
@@ -29,14 +29,12 @@
  * Implementation of Filter interface that limits results to a specific page
  * size. It terminates scanning once the number of filter-passed rows is >
  * the given page size.
- * 
  * <p>
  * Note that this filter cannot guarantee that the number of results returned
  * to a client are <= page size. This is because the filter is applied
  * separately on different region servers. It does however optimize the scan of
  * individual HRegions by making sure that the page size is never exceeded
  * locally.
- * </p>
  */
 public class PageFilter implements Filter {
   private long pageSize = Long.MAX_VALUE;
@@ -60,16 +58,15 @@
   }
 
   public void reset() {
-    rowsAccepted = 0;
+    // noop
   }
 
   public boolean filterAllRemaining() {
-    return this.rowsAccepted > this.pageSize;
+    return this.rowsAccepted >= this.pageSize;
   }
 
   public boolean filterRowKey(byte[] rowKey, int offset, int length) {
-    this.rowsAccepted++;
-    return filterAllRemaining();
+    return false;
   }
 
   public void readFields(final DataInput in) throws IOException {
@@ -81,10 +78,11 @@
   }
 
   public ReturnCode filterKeyValue(KeyValue v) {
-    return filterAllRemaining() ? ReturnCode.NEXT_ROW : ReturnCode.INCLUDE;
+    return ReturnCode.INCLUDE;
   }
 
   public boolean filterRow() {
-    return filterAllRemaining();
+    this.rowsAccepted++;
+    return this.rowsAccepted > this.pageSize;
   }
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PrefixFilter.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/PrefixFilter.java Wed Sep  2 23:23:21 2009
@@ -32,6 +32,7 @@
  */
 public class PrefixFilter implements Filter {
   protected byte [] prefix = null;
+  protected boolean passedPrefix = false;
 
   public PrefixFilter(final byte [] prefix) {
     this.prefix = prefix;
@@ -52,12 +53,17 @@
       return true;
     // if they are equal, return false => pass row
     // else return true, filter row
-    return Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0,
-      this.prefix.length) != 0;
+    // if we are passed the prefix, set flag
+    int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0,
+        this.prefix.length);
+    if(cmp > 0) {
+      passedPrefix = true;
+    }
+    return cmp != 0;
   }
 
   public boolean filterAllRemaining() {
-    return false;
+    return passedPrefix;
   }
 
   public ReturnCode filterKeyValue(KeyValue v) {

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/QualifierFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/QualifierFilter.java?rev=810732&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/QualifierFilter.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/QualifierFilter.java Wed Sep  2 23:23:21 2009
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+
+/**
+ * This filter is used to filter based on the column qualifier. It takes an 
+ * operator (equal, greater, not equal, etc) and a byte [] comparator for the 
+ * column qualifier portion of a key.
+ * <p>
+ * This filter can be wrapped with {@link WhileMatchFilter} and {@link SkipFilter}
+ * to add more control.
+ * <p>
+ * Multiple filters can be combined using {@link FilterList}.
+ * <p>
+ * If an already known column qualifier is looked for, use {@link Get#addColumn}
+ * directly rather than a filter.
+ */
+public class QualifierFilter extends CompareFilter {
+
+  /**
+   * Writable constructor, do not use.
+   */
+  public QualifierFilter() {
+  }
+
+  /**
+   * Constructor.
+   * @param qualifierCompareOp the compare op for column qualifier matching
+   * @param qualifierComparator the comparator for column qualifier matching
+   */
+  public QualifierFilter(final CompareOp qualifierCompareOp,
+      final WritableByteArrayComparable qualifierComparator) {
+    super(qualifierCompareOp, qualifierComparator);
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    int qualifierLength = v.getQualifierLength();
+    if (qualifierLength > 0) {
+      if (doCompare(this.compareOp, this.comparator, v.getBuffer(), 
+          v.getQualifierOffset(), qualifierLength)) {
+        return ReturnCode.SKIP;
+      }
+    }
+    return ReturnCode.INCLUDE;
+  }
+}

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RegexStringComparator.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RegexStringComparator.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RegexStringComparator.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RegexStringComparator.java Wed Sep  2 23:23:21 2009
@@ -27,18 +27,18 @@
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * This comparator is for use with ColumnValueFilter, for filtering based on
- * the value of a given column. Use it to test if a given regular expression
- * matches a cell value in the column.
+ * This comparator is for use with {@link CompareFilter} implementations, such 
+ * as {@link RowFilter}, {@link QualifierFilter}, and {@link ValueFilter}, for 
+ * filtering based on the value of a given column. Use it to test if a given 
+ * regular expression matches a cell value in the column.
  * <p>
- * Only EQUAL or NOT_EQUAL tests are valid with this comparator. 
+ * Only EQUAL or NOT_EQUAL {@link CompareOp} comparisons are valid with this 
+ * comparator. 
  * <p>
  * For example:
  * <p>
  * <pre>
- * ColumnValueFilter cvf =
- *   new ColumnValueFilter("col",
- *     ColumnValueFilter.CompareOp.EQUAL,
+ * ValueFilter vf = new ValueFilter(CompareOp.EQUAL,
  *     new RegexStringComparator(
  *       // v4 IP address
  *       "(((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3,3}" +

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RowFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RowFilter.java?rev=810732&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RowFilter.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/RowFilter.java Wed Sep  2 23:23:21 2009
@@ -0,0 +1,84 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * This filter is used to filter based on the key. It takes an operator
+ * (equal, greater, not equal, etc) and a byte [] comparator for the row, 
+ * and column qualifier portions of a key.
+ * <p>
+ * This filter can be wrapped with {@link WhileMatchFilter} to add more control.
+ * <p>
+ * Multiple filters can be combined using {@link FilterList}.
+ * <p>
+ * If an already known row range needs to be scanned, use {@link Scan} start
+ * and stop rows directly rather than a filter.
+ */
+public class RowFilter extends CompareFilter {
+
+  private boolean filterOutRow = false;
+
+  /**
+   * Writable constructor, do not use.
+   */
+  public RowFilter() {
+    super();
+  }
+
+  /**
+   * Constructor.
+   * @param rowCompareOp the compare op for row matching
+   * @param rowComparator the comparator for row matching
+   */
+  public RowFilter(final CompareOp rowCompareOp, 
+      final WritableByteArrayComparable rowComparator) {
+    super(rowCompareOp, rowComparator);
+  }
+
+  @Override
+  public void reset() {
+    this.filterOutRow = false;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    if(this.filterOutRow) {
+      return ReturnCode.NEXT_ROW;
+    }
+    return ReturnCode.INCLUDE;
+  }
+  
+  @Override
+  public boolean filterRowKey(byte[] data, int offset, int length) {
+    if(doCompare(this.compareOp, this.comparator, data, offset, length)) {
+      this.filterOutRow = true;
+    }
+    return this.filterOutRow;
+  }
+
+  @Override
+  public boolean filterRow() {
+    return this.filterOutRow;
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=810732&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Wed Sep  2 23:23:21 2009
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This filter is used to filter cells based on value. It takes a {@link #Filter.CompareOp} 
+ * operator (equal, greater, not equal, etc), and either a byte [] value or 
+ * a {@link #WritableByteArrayComparable}.
+ * <p>
+ * If we have a byte [] value then we just do a lexicographic compare. For 
+ * example, if passed value is 'b' and cell has 'a' and the compare operator 
+ * is LESS, then we will filter out this cell (return true).  If this is not 
+ * sufficient (eg you want to deserialize a long and then compare it to a fixed 
+ * long value), then you can pass in your own comparator instead.
+ * <p>
+ * You must also specify a family and qualifier.  Only the value of this column 
+ * will be tested.  All other 
+ * <p>
+ * To prevent the entire row from being emitted if this filter determines the
+ * column does not pass (it should be filtered), wrap this filter with a
+ * {@link SkipFilter}.
+ * <p>
+ * To filter based on the value of all scanned columns, use {@link ValueFilter}.
+ */
+public class SingleColumnValueFilter implements Filter {
+  static final Log LOG = LogFactory.getLog(SingleColumnValueFilter.class);
+
+  private byte [] columnFamily;
+  private byte [] columnQualifier; 
+  private CompareOp compareOp;
+  private WritableByteArrayComparable comparator;
+
+  /**
+   * Writable constructor, do not use.
+   */
+  public SingleColumnValueFilter() {
+  }
+  
+  /**
+   * Constructor for binary compare of the value of a single column.  If the
+   * column is found and the condition passes, all columns of the row will be
+   * emitted.  If the column is not found or the condition fails, the row will
+   * not be emitted.
+   * 
+   * @param family name of column family
+   * @param qualifier name of column qualifier
+   * @param compareOp operator
+   * @param value value to compare column values against
+   */
+  public SingleColumnValueFilter(final byte [] family, final byte [] qualifier,
+      final CompareOp compareOp, final byte[] value) {
+    this(family, qualifier, compareOp, new BinaryComparator(value));
+  }
+
+  /**
+   * Constructor for binary compare of the value of a single column.  If the
+   * column is found and the condition passes, all columns of the row will be
+   * emitted.  If the condition fails, the row will not be emitted.
+   * <p>
+   * Use the filterIfColumnMissing flag to set whether the rest of the columns
+   * in a row will be emitted if the specified column to check is not found in
+   * the row.
+   * 
+   * @param family name of column family
+   * @param qualifier name of column qualifier
+   * @param compareOp operator
+   * @param comparator Comparator to use.
+   */
+  public SingleColumnValueFilter(final byte [] family, final byte [] qualifier,
+      final CompareOp compareOp, final WritableByteArrayComparable comparator) {
+    this.columnFamily = family;
+    this.columnQualifier = qualifier;
+    this.compareOp = compareOp;
+    this.comparator = comparator;
+  }
+  
+  public boolean filterRowKey(byte[] rowKey, int offset, int length) {
+    return false;
+  }
+
+  public ReturnCode filterKeyValue(KeyValue keyValue) {
+    if (!keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
+      return ReturnCode.INCLUDE;
+    }
+    if (filterColumnValue(keyValue.getBuffer(),
+        keyValue.getValueOffset(), keyValue.getValueLength())) {
+      return ReturnCode.NEXT_ROW;
+    }
+    return ReturnCode.INCLUDE;
+  }
+
+  private boolean filterColumnValue(final byte[] data, final int offset,
+      final int length) {
+    int compareResult = comparator.compareTo(Arrays.copyOfRange(data, offset,
+        offset + length));
+
+    switch (compareOp) {
+    case LESS:
+      return compareResult <= 0;
+    case LESS_OR_EQUAL:
+      return compareResult < 0;
+    case EQUAL:
+      return compareResult != 0;
+    case NOT_EQUAL:
+      return compareResult == 0;
+    case GREATER_OR_EQUAL:
+      return compareResult > 0;
+    case GREATER:
+      return compareResult >= 0;
+    default:
+      throw new RuntimeException("Unknown Compare op " + compareOp.name());
+    }
+  }
+
+  public boolean filterAllRemaining() {
+    return false;
+  }
+
+  public boolean filterRow() {
+    return false;
+  }
+
+  public void reset() {
+  }
+
+  public void readFields(final DataInput in) throws IOException {
+    this.columnFamily = Bytes.readByteArray(in);
+    if(this.columnFamily.length == 0) {
+      this.columnFamily = null;
+    }
+    this.columnQualifier = Bytes.readByteArray(in);
+    if(this.columnQualifier.length == 0) {
+      this.columnQualifier = null;
+    }
+    compareOp = CompareOp.valueOf(in.readUTF());
+    comparator = (WritableByteArrayComparable) HbaseObjectWritable.readObject(in,
+        null);
+  }
+
+  public void write(final DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, this.columnFamily);
+    Bytes.writeByteArray(out, this.columnQualifier);
+    out.writeUTF(compareOp.name());
+    HbaseObjectWritable.writeObject(out, comparator,
+        WritableByteArrayComparable.class, null);
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=810732&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SkipFilter.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SkipFilter.java Wed Sep  2 23:23:21 2009
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.filter;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+
+/**
+ * A wrapper filter that filters an entire row if any of the KeyValue checks do 
+ * not pass.
+ * <p>
+ * For example, if all columns in a row represent weights of different things,
+ * with the values being the actual weights, and we want to filter out the
+ * entire row if any of its weights are zero.  In this case, we want to prevent
+ * rows from being emitted if a single key is filtered.  Combine this filter
+ * with a {@link ValueFilter}:
+ * <p>
+ * <pre>
+ * scan.setFilter(new SkipFilter(new ValueFilter(CompareOp.EQUAL,
+ *     new BinaryComparator(Bytes.toBytes(0))));
+ * </code>
+ * Any row which contained a column whose value was 0 will be filtered out.
+ * Without this filter, the other non-zero valued columns in the row would still 
+ * be emitted.
+ */
+public class SkipFilter implements Filter {
+  private boolean filterRow = false;
+  private Filter filter;
+
+  public SkipFilter() {
+    super();
+  }
+
+  public SkipFilter(Filter filter) {
+    this.filter = filter;
+  }
+
+  public void reset() {
+    filter.reset();
+    filterRow = false;
+  }
+
+  private void changeFR(boolean value) {
+    filterRow = filterRow || value;
+  }
+
+  public boolean filterRowKey(byte[] buffer, int offset, int length) {
+    return false;
+  }
+
+  public boolean filterAllRemaining() {
+    return false;
+  }
+
+  public ReturnCode filterKeyValue(KeyValue v) {
+    ReturnCode c = filter.filterKeyValue(v);
+    changeFR(c != ReturnCode.INCLUDE);
+    return c;
+  }
+
+  public boolean filterRow() {
+    return filterRow;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(this.filter.getClass().getName());
+    this.filter.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    String className = in.readUTF();
+    try {
+      this.filter = (Filter)(Class.forName(className).newInstance());
+      this.filter.readFields(in);
+    } catch (InstantiationException e) {
+      throw new RuntimeException("Failed deserialize.", e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("Failed deserialize.", e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Failed deserialize.", e);
+    }
+  }
+}

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/ValueFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/ValueFilter.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/ValueFilter.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/ValueFilter.java Wed Sep  2 23:23:21 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2008 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,217 +17,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hbase.filter;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.hbase.client.Get;
 
 /**
- * This filter is used to filter based on the value of a given column. It takes
- * an operator (equal, greater, not equal, etc) and either a byte [] value or a
- * byte [] comparator. If we have a byte [] value then we just do a
- * lexicographic compare. For example, if passed value is 'b' and cell has 'a'
- * and the compare operator is LESS, then we will filter out this cell (return
- * true).  If this is not sufficient (eg you want to deserialize
- * a long and then compare it to a fixed long value), then you can pass in your
- * own comparator instead.
- * */
-public class ValueFilter implements Filter {
-  static final Log LOG = LogFactory.getLog(ValueFilter.class);
-
-    /** Comparison operators. */
-  public enum CompareOp {
-    /** less than */
-    LESS,
-    /** less than or equal to */
-    LESS_OR_EQUAL,
-    /** equals */
-    EQUAL,
-    /** not equal */
-    NOT_EQUAL,
-    /** greater than or equal to */
-    GREATER_OR_EQUAL,
-    /** greater than */
-    GREATER;
-  }
-
-  private byte [] columnFamily;
-  private byte [] columnQualifier; 
-  private CompareOp compareOp;
-  private byte [] value;
-  private WritableByteArrayComparable comparator;
-  private boolean filterIfColumnMissing;
-
-  private boolean filterThisRow = false;
-  private boolean foundColValue = false;
-
-  ValueFilter() {
-    // for Writable
-  }
-
-  /**
-   * Constructor.
-   * 
-   * @param family name of column family
-   * @param qualifier name of column qualifier
-   * @param compareOp operator
-   * @param value value to compare column values against
-   */
-  public ValueFilter(final byte [] family, final byte [] qualifier,
-      final CompareOp compareOp, final byte[] value) {
-    this(family, qualifier, compareOp, value, true);
-  }
-
-  /**
-   * Constructor.
-   * 
-   * @param family name of column family
-   * @param qualifier name of column qualifier
-   * @param compareOp operator
-   * @param value value to compare column values against
-   * @param filterIfColumnMissing if true then we will filter rows that don't
-   * have the column.
-   */
-  public ValueFilter(final byte [] family, final byte [] qualifier,
-      final CompareOp compareOp,
-      final byte[] value, boolean filterIfColumnMissing) {
-    this.columnFamily = family;
-    this.columnQualifier = qualifier;
-    this.compareOp = compareOp;
-    this.value = value;
-    this.filterIfColumnMissing = filterIfColumnMissing;
-  }
+ * This filter is used to filter based on column value. It takes an 
+ * operator (equal, greater, not equal, etc) and a byte [] comparator for the 
+ * cell value.
+ * <p>
+ * This filter can be wrapped with {@link WhileMatchFilter} and {@link SkipFilter}
+ * to add more control.
+ * <p>
+ * Multiple filters can be combined using {@link FilterList}.
+ * <p>
+ * To test the value of a single qualifier when scanning multiple qualifiers,
+ * use {@link SingleColumnValueFilter}.
+ */
+public class ValueFilter extends CompareFilter {
 
   /**
-   * Constructor.
-   * 
-   * @param family name of column family
-   * @param qualifier name of column qualifier
-   * @param compareOp operator
-   * @param comparator Comparator to use.
+   * Writable constructor, do not use.
    */
-  public ValueFilter(final byte [] family, final byte [] qualifier,
-      final CompareOp compareOp,
-      final WritableByteArrayComparable comparator) {
-    this(family, qualifier, compareOp, comparator, true);
+  public ValueFilter() {
   }
 
   /**
    * Constructor.
-   * 
-   * @param family name of column family
-   * @param qualifier name of column qualifier
-   * @param compareOp operator
-   * @param comparator Comparator to use.
-   * @param filterIfColumnMissing if true then we will filter rows that don't
-   * have the column.
+   * @param valueCompareOp the compare op for column qualifier matching
+   * @param valueComparator the comparator for column qualifier matching
    */
-  public ValueFilter(final byte [] family, final byte [] qualifier,
-      final CompareOp compareOp,
-      final WritableByteArrayComparable comparator,
-      boolean filterIfColumnMissing) {
-    this.columnFamily = family;
-    this.columnQualifier = qualifier;
-    this.compareOp = compareOp;
-    this.comparator = comparator;
-    this.filterIfColumnMissing = filterIfColumnMissing;
-  }
-
-  public boolean filterRowKey(byte[] rowKey, int offset, int length) {
-    return false;
+  public ValueFilter(final CompareOp valueCompareOp,
+      final WritableByteArrayComparable valueComparator) {
+    super(valueCompareOp, valueComparator);
   }
 
-  public ReturnCode filterKeyValue(KeyValue keyValue) {
-    if (!keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
-      return ReturnCode.INCLUDE;
-    }
-    this.foundColValue = true;
-    boolean filtered = filterColumnValue(keyValue.getBuffer(),
-      keyValue.getValueOffset(), keyValue.getValueLength());
-    if (filtered) {
-      this.filterThisRow = true;
-      return ReturnCode.NEXT_ROW;
+  @Override
+  public ReturnCode filterKeyValue(KeyValue v) {
+    if (doCompare(this.compareOp, this.comparator, v.getBuffer(), 
+        v.getValueOffset(), v.getValueLength())) {
+      return ReturnCode.SKIP;
     }
     return ReturnCode.INCLUDE;
   }
-
-  private boolean filterColumnValue(final byte[] data, final int offset,
-      final int length) {
-    int compareResult;
-    if (comparator != null) {
-      compareResult = comparator.compareTo(Arrays.copyOfRange(data, offset,
-          offset + length));
-    } else {
-      compareResult = Bytes.compareTo(value, 0, value.length, data, offset,
-          length);
-    }
-
-    switch (compareOp) {
-    case LESS:
-      return compareResult <= 0;
-    case LESS_OR_EQUAL:
-      return compareResult < 0;
-    case EQUAL:
-      return compareResult != 0;
-    case NOT_EQUAL:
-      return compareResult == 0;
-    case GREATER_OR_EQUAL:
-      return compareResult > 0;
-    case GREATER:
-      return compareResult >= 0;
-    default:
-      throw new RuntimeException("Unknown Compare op " + compareOp.name());
-    }
-  }
-
-  public boolean filterAllRemaining() {
-    return false;
-  }
-
-  public boolean filterRow() {
-    return filterThisRow || (filterIfColumnMissing && !foundColValue);
-  }
-
-  public void reset() {    
-    filterThisRow = false;
-    foundColValue = false;
-  }
-
-  public void readFields(final DataInput in) throws IOException {
-    int valueLen = in.readInt();
-    if (valueLen > 0) {
-      value = new byte[valueLen];
-      in.readFully(value);
-    }
-    this.columnFamily = Bytes.readByteArray(in);
-    this.columnQualifier = Bytes.readByteArray(in);
-    compareOp = CompareOp.valueOf(in.readUTF());
-    comparator = (WritableByteArrayComparable) ObjectWritable.readObject(in,
-        new HBaseConfiguration());
-    filterIfColumnMissing = in.readBoolean();
-  }
-
-  public void write(final DataOutput out) throws IOException {
-    if (value == null) {
-      out.writeInt(0);
-    } else {
-      out.writeInt(value.length);
-      out.write(value);
-    }
-    Bytes.writeByteArray(out, this.columnFamily);
-    Bytes.writeByteArray(out, this.columnQualifier);
-    out.writeUTF(compareOp.name());
-    ObjectWritable.writeObject(out, comparator,
-        WritableByteArrayComparable.class, new HBaseConfiguration());
-    out.writeBoolean(filterIfColumnMissing);
-  }
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed Sep  2 23:23:21 2009
@@ -143,7 +143,14 @@
     addToMap(PageFilter.class, code++);
     addToMap(InclusiveStopFilter.class, code++);
     addToMap(ColumnCountGetFilter.class, code++);
+    addToMap(SingleColumnValueFilter.class, code++);
+    addToMap(BinaryComparator.class, code++);
+    addToMap(CompareFilter.class, code++);
+    addToMap(RowFilter.class, code++);
     addToMap(ValueFilter.class, code++);
+    addToMap(QualifierFilter.class, code++);
+    addToMap(SkipFilter.class, code++);
+    addToMap(WritableByteArrayComparable.class, code++);
   }
   
   private Class<?> declaredClass;
@@ -400,7 +407,7 @@
       if (b.byteValue() == NOT_ENCODED) {
         String className = Text.readString(in);
         try {
-          instanceClass = conf.getClassByName(className);
+          instanceClass = getClassByName(conf, className);
         } catch (ClassNotFoundException e) {
           throw new RuntimeException("Can't find class " + className);
         }
@@ -422,6 +429,19 @@
     return instance;
   }
 
+  @SuppressWarnings("unchecked")
+  private static Class getClassByName(Configuration conf, String className) 
+  throws ClassNotFoundException {
+    if(conf != null) {
+      return conf.getClassByName(className);
+    }
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    if(cl == null) {
+      cl = HbaseObjectWritable.class.getClassLoader();
+    }
+    return Class.forName(className, true, cl);
+  }
+  
   private static void addToMap(final Class<?> clazz, final byte code) {
     CLASS_TO_CODE.put(clazz, code);
     CODE_TO_CLASS.put(code, clazz);

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Sep  2 23:23:21 2009
@@ -1736,6 +1736,9 @@
       }
       outResults.addAll(results);
       resetFilters();
+      if(filter != null && filter.filterAllRemaining()) {
+        return false;
+      }
       return returnResult;
     }
 
@@ -1759,6 +1762,9 @@
         // see if current row should be filtered based on row key
         if ((filter != null && filter.filterRowKey(row, 0, row.length)) ||
             (oldFilter != null && oldFilter.filterRowKey(row, 0, row.length))) {
+          if(!results.isEmpty() && !Bytes.equals(currentRow, row)) {
+            return true;
+          }
           this.storeHeap.next(results);
           results.clear();
           resetFilters();

Added: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilter.java?rev=810732&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilter.java (added)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilter.java Wed Sep  2 23:23:21 2009
@@ -0,0 +1,868 @@
+package org.apache.hadoop.hbase.filter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Test filters at the HRegion doorstep.
+ */
+public class TestFilter extends HBaseTestCase {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+  private HRegion region;
+  
+  //
+  // Rows, Qualifiers, and Values are in two groups, One and Two.
+  //
+
+  private static final byte [][] ROWS_ONE = {
+      Bytes.toBytes("testRowOne-0"), Bytes.toBytes("testRowOne-1"),
+      Bytes.toBytes("testRowOne-2"), Bytes.toBytes("testRowOne-3")
+  };
+
+  private static final byte [][] ROWS_TWO = {
+      Bytes.toBytes("testRowTwo-0"), Bytes.toBytes("testRowTwo-1"),
+      Bytes.toBytes("testRowTwo-2"), Bytes.toBytes("testRowTwo-3")
+  };
+  
+  private static final byte [][] FAMILIES = {
+    Bytes.toBytes("testFamilyOne"), Bytes.toBytes("testFamilyTwo")
+  };
+
+  private static final byte [][] QUALIFIERS_ONE = {
+    Bytes.toBytes("testQualifierOne-0"), Bytes.toBytes("testQualifierOne-1"),
+    Bytes.toBytes("testQualifierOne-2"), Bytes.toBytes("testQualifierOne-3")
+  };
+  
+  private static final byte [][] QUALIFIERS_TWO = {
+    Bytes.toBytes("testQualifierTwo-0"), Bytes.toBytes("testQualifierTwo-1"),
+    Bytes.toBytes("testQualifierTwo-2"), Bytes.toBytes("testQualifierTwo-3")
+  };
+  
+  private static final byte [][] VALUES = {
+    Bytes.toBytes("testValueOne"), Bytes.toBytes("testValueTwo")
+  };
+  
+  private long numRows = ROWS_ONE.length + ROWS_TWO.length;
+  private long colsPerRow = FAMILIES.length * QUALIFIERS_ONE.length;
+    
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+    HTableDescriptor htd = new HTableDescriptor(getName());
+    htd.addFamily(new HColumnDescriptor(FAMILIES[0]));
+    htd.addFamily(new HColumnDescriptor(FAMILIES[1]));
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    this.region = HRegion.createHRegion(info, this.testDir, this.conf);
+    
+    // Insert first half
+    for(byte [] ROW : ROWS_ONE) {
+      Put p = new Put(ROW);
+      for(byte [] QUALIFIER : QUALIFIERS_ONE) {
+        p.add(FAMILIES[0], QUALIFIER, VALUES[0]);
+      }
+      this.region.put(p);
+    }
+    for(byte [] ROW : ROWS_TWO) {
+      Put p = new Put(ROW);
+      for(byte [] QUALIFIER : QUALIFIERS_TWO) {
+        p.add(FAMILIES[1], QUALIFIER, VALUES[1]);
+      }
+      this.region.put(p);
+    }
+    
+    // Flush
+    this.region.flushcache();
+    
+    // Insert second half (reverse families)
+    for(byte [] ROW : ROWS_ONE) {
+      Put p = new Put(ROW);
+      for(byte [] QUALIFIER : QUALIFIERS_ONE) {
+        p.add(FAMILIES[1], QUALIFIER, VALUES[0]);
+      }
+      this.region.put(p);
+    }
+    for(byte [] ROW : ROWS_TWO) {
+      Put p = new Put(ROW);
+      for(byte [] QUALIFIER : QUALIFIERS_TWO) {
+        p.add(FAMILIES[0], QUALIFIER, VALUES[1]);
+      }
+      this.region.put(p);
+    }
+    
+    // Delete the second qualifier from all rows and families
+    for(byte [] ROW : ROWS_ONE) {
+      Delete d = new Delete(ROW);
+      d.deleteColumns(FAMILIES[0], QUALIFIERS_ONE[1]);
+      d.deleteColumns(FAMILIES[1], QUALIFIERS_ONE[1]);
+      this.region.delete(d, null, false);
+    }    
+    for(byte [] ROW : ROWS_TWO) {
+      Delete d = new Delete(ROW);
+      d.deleteColumns(FAMILIES[0], QUALIFIERS_TWO[1]);
+      d.deleteColumns(FAMILIES[1], QUALIFIERS_TWO[1]);
+      this.region.delete(d, null, false);
+    }
+    colsPerRow -= 2;
+    
+    // Delete the second rows from both groups, one column at a time
+    for(byte [] QUALIFIER : QUALIFIERS_ONE) {
+      Delete d = new Delete(ROWS_ONE[1]);
+      d.deleteColumns(FAMILIES[0], QUALIFIER);
+      d.deleteColumns(FAMILIES[1], QUALIFIER);
+      this.region.delete(d, null, false);
+    }
+    for(byte [] QUALIFIER : QUALIFIERS_TWO) {
+      Delete d = new Delete(ROWS_TWO[1]);
+      d.deleteColumns(FAMILIES[0], QUALIFIER);
+      d.deleteColumns(FAMILIES[1], QUALIFIER);
+      this.region.delete(d, null, false);
+    }
+    numRows -= 2;
+  }
+
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    this.region.close();
+  }
+
+  public void testNoFilter() throws Exception {
+    
+    // No filter
+    long expectedRows = this.numRows;
+    long expectedKeys = this.colsPerRow;
+    
+    // Both families
+    Scan s = new Scan();
+    verifyScan(s, expectedRows, expectedKeys);
+
+    // One family
+    s = new Scan();
+    s.addFamily(FAMILIES[0]);
+    verifyScan(s, expectedRows, expectedKeys/2);
+  }
+  
+  public void testPrefixFilter() throws Exception {
+    
+    // Grab rows from group one (half of total)
+    
+    long expectedRows = this.numRows / 2;
+    long expectedKeys = this.colsPerRow;
+    
+    Scan s = new Scan();
+    s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne")));
+
+    verifyScan(s, expectedRows, expectedKeys);
+    
+  }
+  
+  public void testPageFilter() throws Exception {
+    
+    // KVs in first 6 rows
+    KeyValue [] expectedKVs = {
+      // testRowOne-0
+      new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+      new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
+      new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+      new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+      new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
+      new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+      // testRowOne-2
+      new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+      new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
+      new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+      new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+      new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
+      new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+      // testRowOne-3
+      new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+      new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
+      new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+      new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+      new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
+      new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+      // testRowTwo-0
+      new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+      new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+      new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+      new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+      new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+      new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+      // testRowTwo-2
+      new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+      new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+      new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+      new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+      new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+      new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+      // testRowTwo-3
+      new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+      new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+      new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+      new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+      new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+      new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1])
+    };
+    
+    // Grab all 6 rows
+    long expectedRows = 6;
+    long expectedKeys = this.colsPerRow;
+    Scan s = new Scan();
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScan(s, expectedRows, expectedKeys);
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScanFull(s, expectedKVs);
+    
+    // Grab first 4 rows (6 cols per row)
+    expectedRows = 4;
+    expectedKeys = this.colsPerRow;
+    s = new Scan();
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScan(s, expectedRows, expectedKeys);
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScanFull(s, Arrays.copyOf(expectedKVs, 24));
+    
+    // Grab first 2 rows
+    expectedRows = 2;
+    expectedKeys = this.colsPerRow;
+    s = new Scan();
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScan(s, expectedRows, expectedKeys);
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScanFull(s, Arrays.copyOf(expectedKVs, 12));
+
+    // Grab first row
+    expectedRows = 1;
+    expectedKeys = this.colsPerRow;
+    s = new Scan();
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScan(s, expectedRows, expectedKeys);
+    s.setFilter(new PageFilter(expectedRows));
+    verifyScanFull(s, Arrays.copyOf(expectedKVs, 6));
+    
+  }
+  
+  public void testInclusiveStopFilter() throws IOException {
+
+    // Grab rows from group one
+    
+    // If we just use start/stop row, we get total/2 - 1 rows
+    long expectedRows = (this.numRows / 2) - 1;
+    long expectedKeys = this.colsPerRow;
+    Scan s = new Scan(Bytes.toBytes("testRowOne-0"), 
+        Bytes.toBytes("testRowOne-3"));
+    verifyScan(s, expectedRows, expectedKeys);
+    
+    // Now use start row with inclusive stop filter
+    expectedRows = this.numRows / 2;
+    s = new Scan(Bytes.toBytes("testRowOne-0"));
+    s.setFilter(new InclusiveStopFilter(Bytes.toBytes("testRowOne-3")));
+    verifyScan(s, expectedRows, expectedKeys);
+
+    // Grab rows from group two
+    
+    // If we just use start/stop row, we get total/2 - 1 rows
+    expectedRows = (this.numRows / 2) - 1;
+    expectedKeys = this.colsPerRow;
+    s = new Scan(Bytes.toBytes("testRowTwo-0"), 
+        Bytes.toBytes("testRowTwo-3"));
+    verifyScan(s, expectedRows, expectedKeys);
+    
+    // Now use start row with inclusive stop filter
+    expectedRows = this.numRows / 2;
+    s = new Scan(Bytes.toBytes("testRowTwo-0"));
+    s.setFilter(new InclusiveStopFilter(Bytes.toBytes("testRowTwo-3")));
+    verifyScan(s, expectedRows, expectedKeys);
+
+  }
+  
+  public void testQualifierFilter() throws IOException {
+    
+    // Match two keys (one from each family) in half the rows
+    long expectedRows = this.numRows / 2;
+    long expectedKeys = 2;
+    Filter f = new QualifierFilter(CompareOp.EQUAL,
+        new BinaryComparator(Bytes.toBytes("testQualifierOne-2")));
+    Scan s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys less than same qualifier
+    // Expect only two keys (one from each family) in half the rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = 2;
+    f = new QualifierFilter(CompareOp.LESS,
+        new BinaryComparator(Bytes.toBytes("testQualifierOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys less than or equal
+    // Expect four keys (two from each family) in half the rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = 4;
+    f = new QualifierFilter(CompareOp.LESS_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testQualifierOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys not equal
+    // Expect four keys (two from each family)
+    // Only look in first group of rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = 4;
+    f = new QualifierFilter(CompareOp.NOT_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testQualifierOne-2")));
+    s = new Scan(HConstants.EMPTY_START_ROW, Bytes.toBytes("testRowTwo"));
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys greater or equal
+    // Expect four keys (two from each family)
+    // Only look in first group of rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = 4;
+    f = new QualifierFilter(CompareOp.GREATER_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testQualifierOne-2")));
+    s = new Scan(HConstants.EMPTY_START_ROW, Bytes.toBytes("testRowTwo"));
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys greater
+    // Expect two keys (one from each family)
+    // Only look in first group of rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = 2;
+    f = new QualifierFilter(CompareOp.GREATER,
+        new BinaryComparator(Bytes.toBytes("testQualifierOne-2")));
+    s = new Scan(HConstants.EMPTY_START_ROW, Bytes.toBytes("testRowTwo"));
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys not equal to
+    // Look across rows and fully validate the keys and ordering
+    // Expect varied numbers of keys, 4 per row in group one, 6 per row in group two
+    f = new QualifierFilter(CompareOp.NOT_EQUAL,
+        new BinaryComparator(QUALIFIERS_ONE[2]));
+    s = new Scan();
+    s.setFilter(f);
+    
+    KeyValue [] kvs = {
+        // testRowOne-0
+        new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowOne-2
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowOne-3
+        new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowTwo-0
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-2
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-3
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+    };
+    verifyScanFull(s, kvs);
+     
+    
+    // Test across rows and groups with a regex
+    // Filter out "test*-2"
+    // Expect 4 keys per row across both groups
+    f = new QualifierFilter(CompareOp.NOT_EQUAL,
+        new RegexStringComparator("test.+-2"));
+    s = new Scan();
+    s.setFilter(f);
+    
+    kvs = new KeyValue [] {
+        // testRowOne-0
+        new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowOne-2
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowOne-3
+        new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowTwo-0
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-2
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-3
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+    };
+    verifyScanFull(s, kvs);
+     
+  }
+  
+  public void testRowFilter() throws IOException {
+
+    // Match a single row, all keys
+    long expectedRows = 1;
+    long expectedKeys = this.colsPerRow;
+    Filter f = new RowFilter(CompareOp.EQUAL,
+        new BinaryComparator(Bytes.toBytes("testRowOne-2")));
+    Scan s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match a two rows, one from each group, using regex
+    expectedRows = 2;
+    expectedKeys = this.colsPerRow;
+    f = new RowFilter(CompareOp.EQUAL,
+        new RegexStringComparator("testRow.+-2"));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match rows less than
+    // Expect all keys in one row
+    expectedRows = 1;
+    expectedKeys = this.colsPerRow;
+    f = new RowFilter(CompareOp.LESS,
+        new BinaryComparator(Bytes.toBytes("testRowOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match rows less than or equal
+    // Expect all keys in two rows
+    expectedRows = 2;
+    expectedKeys = this.colsPerRow;
+    f = new RowFilter(CompareOp.LESS_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testRowOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match rows not equal
+    // Expect all keys in all but one row
+    expectedRows = this.numRows - 1;
+    expectedKeys = this.colsPerRow;
+    f = new RowFilter(CompareOp.NOT_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testRowOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys greater or equal
+    // Expect all keys in all but one row
+    expectedRows = this.numRows - 1;
+    expectedKeys = this.colsPerRow;
+    f = new RowFilter(CompareOp.GREATER_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testRowOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match keys greater
+    // Expect all keys in all but two rows
+    expectedRows = this.numRows - 2;
+    expectedKeys = this.colsPerRow;
+    f = new RowFilter(CompareOp.GREATER,
+        new BinaryComparator(Bytes.toBytes("testRowOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match rows not equal to testRowTwo-2
+    // Look across rows and fully validate the keys and ordering
+    // Should see all keys in all rows but testRowTwo-2
+    f = new RowFilter(CompareOp.NOT_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testRowOne-2")));
+    s = new Scan();
+    s.setFilter(f);
+    
+    KeyValue [] kvs = {
+        // testRowOne-0
+        new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
+        new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowOne-3
+        new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
+        new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowTwo-0
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-2
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-3
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+    };
+    verifyScanFull(s, kvs);
+     
+    
+    // Test across rows and groups with a regex
+    // Filter out everything that doesn't match "*-2"
+    // Expect all keys in two rows
+    f = new RowFilter(CompareOp.EQUAL,
+        new RegexStringComparator(".+-2"));
+    s = new Scan();
+    s.setFilter(f);
+    
+    kvs = new KeyValue [] {
+        // testRowOne-2
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
+        new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
+        // testRowTwo-2
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1])
+    };
+    verifyScanFull(s, kvs);
+     
+  }
+  
+  public void testValueFilter() throws IOException {
+    
+    // Match group one rows
+    long expectedRows = this.numRows / 2;
+    long expectedKeys = this.colsPerRow;
+    Filter f = new ValueFilter(CompareOp.EQUAL,
+        new BinaryComparator(Bytes.toBytes("testValueOne")));
+    Scan s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+
+    // Match group two rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.EQUAL,
+        new BinaryComparator(Bytes.toBytes("testValueTwo")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match all values using regex
+    expectedRows = this.numRows;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.EQUAL,
+        new RegexStringComparator("testValue((One)|(Two))"));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match values less than
+    // Expect group one rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.LESS,
+        new BinaryComparator(Bytes.toBytes("testValueTwo")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match values less than or equal
+    // Expect all rows
+    expectedRows = this.numRows;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.LESS_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testValueTwo")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+
+    // Match values less than or equal
+    // Expect group one rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.LESS_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testValueOne")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match values not equal
+    // Expect half the rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.NOT_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testValueOne")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match values greater or equal
+    // Expect all rows
+    expectedRows = this.numRows;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.GREATER_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testValueOne")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match values greater
+    // Expect half rows
+    expectedRows = this.numRows / 2;
+    expectedKeys = this.colsPerRow;
+    f = new ValueFilter(CompareOp.GREATER,
+        new BinaryComparator(Bytes.toBytes("testValueOne")));
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
+    
+    // Match values not equal to testValueOne
+    // Look across rows and fully validate the keys and ordering
+    // Should see all keys in all group two rows
+    f = new ValueFilter(CompareOp.NOT_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testValueOne")));
+    s = new Scan();
+    s.setFilter(f);
+    
+    KeyValue [] kvs = {
+        // testRowTwo-0
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-2
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-3
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+    };
+    verifyScanFull(s, kvs);
+  }
+  
+  public void testSkipFilter() throws IOException {
+    
+    // Test for qualifier regex: "testQualifierOne-2"
+    // Should only get rows from second group, and all keys
+    Filter f = new SkipFilter(new QualifierFilter(CompareOp.NOT_EQUAL,
+        new BinaryComparator(Bytes.toBytes("testQualifierOne-2"))));
+    Scan s = new Scan();
+    s.setFilter(f);
+    
+    KeyValue [] kvs = {
+        // testRowTwo-0
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-2
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+        // testRowTwo-3
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
+        new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
+    };
+    verifyScanFull(s, kvs);
+  }
+    
+  // TODO: This is important... need many more tests for ordering, etc
+  // There are limited tests elsewhere but we need HRegion level ones here
+  public void testFilterList() throws IOException {
+    
+    // Test getting a single row, single key using Row, Qualifier, and Value 
+    // regular expression and substring filters
+    // Use must pass all
+    List<Filter> filters = new ArrayList<Filter>();
+    filters.add(new RowFilter(CompareOp.EQUAL, new RegexStringComparator(".+-2")));
+    filters.add(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(".+-2")));
+    filters.add(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("One")));
+    Filter f = new FilterList(Operator.MUST_PASS_ALL, filters);
+    Scan s = new Scan();
+    s.addFamily(FAMILIES[0]);
+    s.setFilter(f);
+    KeyValue [] kvs = {
+        new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0])
+    };
+    verifyScanFull(s, kvs);
+
+    // Test getting everything with a MUST_PASS_ONE filter including row, qf, val
+    // regular expression and substring filters
+    filters.clear();
+    filters.add(new RowFilter(CompareOp.EQUAL, new RegexStringComparator(".+Two.+")));
+    filters.add(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(".+-2")));
+    filters.add(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("One")));
+    f = new FilterList(Operator.MUST_PASS_ONE, filters);
+    s = new Scan();
+    s.setFilter(f);
+    verifyScanNoEarlyOut(s, this.numRows, this.colsPerRow);
+    
+    
+  }
+  
+  private void verifyScan(Scan s, long expectedRows, long expectedKeys) 
+  throws IOException {
+    InternalScanner scanner = this.region.getScanner(s);
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    int i = 0;
+    for (boolean done = true; done; i++) {
+      done = scanner.next(results);
+      Arrays.sort(results.toArray(new KeyValue[results.size()]),
+          KeyValue.COMPARATOR);
+      LOG.info("counter=" + i + ", " + results);
+      assertTrue("Scanned too many rows! Only expected " + expectedRows + 
+          " total but already scanned " + (i+1), expectedRows > i);
+      assertEquals("Expected " + expectedKeys + " keys per row but " +
+          "returned " + results.size(), expectedKeys, results.size());
+      results.clear();
+    }
+    assertEquals("Expected " + expectedRows + " rows but scanned " + i +
+        " rows", expectedRows, i);
+  }
+
+
+  
+  private void verifyScanNoEarlyOut(Scan s, long expectedRows, 
+      long expectedKeys) 
+  throws IOException {
+    InternalScanner scanner = this.region.getScanner(s);
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    int i = 0;
+    for (boolean done = true; done; i++) {
+      done = scanner.next(results);
+      Arrays.sort(results.toArray(new KeyValue[results.size()]),
+          KeyValue.COMPARATOR);
+      LOG.info("counter=" + i + ", " + results);
+      if(results.isEmpty()) break;
+      assertTrue("Scanned too many rows! Only expected " + expectedRows + 
+          " total but already scanned " + (i+1), expectedRows > i);
+      assertEquals("Expected " + expectedKeys + " keys per row but " +
+          "returned " + results.size(), expectedKeys, results.size());
+      results.clear();
+    }
+    assertEquals("Expected " + expectedRows + " rows but scanned " + i +
+        " rows", expectedRows, i);
+  }
+
+  private void verifyScanFull(Scan s, KeyValue [] kvs)
+  throws IOException {
+    InternalScanner scanner = this.region.getScanner(s);
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    int row = 0;
+    int idx = 0;
+    for (boolean done = true; done; row++) {
+      done = scanner.next(results);
+      Arrays.sort(results.toArray(new KeyValue[results.size()]),
+          KeyValue.COMPARATOR);
+      assertTrue("Scanned too many keys! Only expected " + kvs.length + 
+          " total but already scanned " + (results.size() + idx), 
+          kvs.length >= idx + results.size());
+      for(KeyValue kv : results) {
+        LOG.info("row=" + row + ", result=" + kv.toString() + 
+            ", match=" + kvs[idx].toString());
+        assertTrue("Row mismatch", 
+            Bytes.equals(kv.getRow(), kvs[idx].getRow()));
+        assertTrue("Family mismatch", 
+            Bytes.equals(kv.getFamily(), kvs[idx].getFamily()));
+        assertTrue("Qualifier mismatch", 
+            Bytes.equals(kv.getQualifier(), kvs[idx].getQualifier()));
+        assertTrue("Value mismatch", 
+            Bytes.equals(kv.getValue(), kvs[idx].getValue()));
+        idx++;
+      }
+      results.clear();
+    }
+    LOG.info("Looked at " + row + " rows with " + idx + " keys");
+    assertEquals("Expected " + kvs.length + " total keys but scanned " + idx,
+        kvs.length, idx);
+  }
+}

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilterList.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilterList.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilterList.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestFilterList.java Wed Sep  2 23:23:21 2009
@@ -72,6 +72,7 @@
     byte [] rowkey = Bytes.toBytes("yyyyyyyyy");
     for (int i = 0; i < MAX_PAGES - 1; i++) {
       assertFalse(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
+      assertFalse(filterMPONE.filterRow());
       KeyValue kv = new KeyValue(rowkey, rowkey, Bytes.toBytes(i),
         Bytes.toBytes(i));
       assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
@@ -80,6 +81,7 @@
     /* Only pass PageFilter */
     rowkey = Bytes.toBytes("z");
     assertFalse(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
+    assertFalse(filterMPONE.filterRow());
     KeyValue kv = new KeyValue(rowkey, rowkey, Bytes.toBytes(0),
         Bytes.toBytes(0));
     assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
@@ -87,19 +89,16 @@
     /* PageFilter will fail now, but should pass because we match yyy */
     rowkey = Bytes.toBytes("yyy");
     assertFalse(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
+    assertFalse(filterMPONE.filterRow());
     kv = new KeyValue(rowkey, rowkey, Bytes.toBytes(0),
         Bytes.toBytes(0));
     assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
     
-    /* We should filter the row key now if we match neither */
-    rowkey = Bytes.toBytes("x");
+    /* We should filter any row */
+    rowkey = Bytes.toBytes("z");
     assertTrue(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
-    kv = new KeyValue(rowkey, rowkey, Bytes.toBytes(0),
-        Bytes.toBytes(0));
-    assertTrue(Filter.ReturnCode.SKIP == filterMPONE.filterKeyValue(kv));
-    
-    // Both filters in Set should be satisfied by now
     assertTrue(filterMPONE.filterRow());
+    assertTrue(filterMPONE.filterAllRemaining());
 
   }
 
@@ -153,6 +152,7 @@
     List<Filter> filters = new ArrayList<Filter>();
     filters.add(new PrefixFilter(Bytes.toBytes("yyy")));
     filters.add(new PageFilter(MAX_PAGES));
+    RegexStringComparator rsc;
     Filter filterMPONE =
         new FilterList(FilterList.Operator.MUST_PASS_ONE, filters);
     /* Filter must do all below steps:
@@ -171,21 +171,23 @@
     assertFalse(filterMPONE.filterAllRemaining());
     
     /* We should be able to fill MAX_PAGES without incrementing page counter */
-    byte [] rowkey = Bytes.toBytes("yyyyyyyyy");
+    byte [] rowkey = Bytes.toBytes("yyyyyyyy");
     for (int i = 0; i < MAX_PAGES; i++) {
       assertFalse(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
       KeyValue kv = new KeyValue(rowkey, rowkey, Bytes.toBytes(i),
-        Bytes.toBytes(i));
-      assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
+          Bytes.toBytes(i));
+        assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
+      assertFalse(filterMPONE.filterRow());
     }
     
     /* Now let's fill the page filter */
-    rowkey = Bytes.toBytes("zzzzzzzz");
+    rowkey = Bytes.toBytes("xxxxxxx");
     for (int i = 0; i < MAX_PAGES; i++) {
       assertFalse(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
       KeyValue kv = new KeyValue(rowkey, rowkey, Bytes.toBytes(i),
-        Bytes.toBytes(i));
-      assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
+          Bytes.toBytes(i));
+        assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
+      assertFalse(filterMPONE.filterRow());
     }
     
     /* We should still be able to include even though page filter is at max */
@@ -193,14 +195,10 @@
     for (int i = 0; i < MAX_PAGES; i++) {
       assertFalse(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
       KeyValue kv = new KeyValue(rowkey, rowkey, Bytes.toBytes(i),
-        Bytes.toBytes(i));
-      assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
+          Bytes.toBytes(i));
+        assertTrue(Filter.ReturnCode.INCLUDE == filterMPONE.filterKeyValue(kv));
+      assertFalse(filterMPONE.filterRow());
     }
-    
-    /* We should filter the row key now if we don't match neither */
-    rowkey = Bytes.toBytes("x");
-    assertTrue(filterMPONE.filterRowKey(rowkey, 0, rowkey.length));
-    
   }
 
   /**

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestInclusiveStopFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestInclusiveStopFilter.java?rev=810732&r1=810731&r2=810732&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestInclusiveStopFilter.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/filter/TestInclusiveStopFilter.java Wed Sep  2 23:23:21 2009
@@ -83,7 +83,7 @@
     assertTrue("Filtering on " + Bytes.toString(PAST_STOP_ROW),
       filter.filterRowKey(PAST_STOP_ROW, 0, PAST_STOP_ROW.length));
 
-    assertFalse("FilterAllRemaining", filter.filterAllRemaining());
+    assertTrue("FilterAllRemaining", filter.filterAllRemaining());
     assertFalse("FilterNotNull", filter.filterRow());
 
     assertFalse("Filter a null", filter.filterRowKey(null, 0, 0));