You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/22 22:45:06 UTC

[08/15] git commit: ACCUMULO-652 added column visibility block indexing and unit test, added Filterer interface to the system iterators, made VisibilityFilter use the new filtering capability, added notion of optional filtering to the Filterer interface

ACCUMULO-652 added column visibility block indexing and unit test, added Filterer interface to the system iterators, made VisibilityFilter use the new filtering capability, added notion of optional filtering to the Filterer interface

git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/ACCUMULO-652@1355353 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/af8cefa7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/af8cefa7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/af8cefa7

Branch: refs/heads/ACCUMULO-652
Commit: af8cefa7acd13f5241078bcf70dfbcc20aa5a935
Parents: 727e61e
Author: Adam Fuchs <af...@apache.org>
Authored: Fri Jun 29 13:11:48 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Fri Jun 29 13:11:48 2012 +0000

----------------------------------------------------------------------
 .../accumulo/core/file/rfile/BlockStats.java    | 117 +++++++++++++++++++
 .../apache/accumulo/core/file/rfile/RFile.java  |  22 ++--
 .../accumulo/core/iterators/Filterer.java       |  11 +-
 .../core/iterators/WrappingIterator.java        |  11 +-
 .../predicates/ColumnVisibilityPredicate.java   |  26 +++++
 .../core/iterators/system/MultiIterator.java    |  15 ++-
 .../system/SourceSwitchingIterator.java         |  35 +++++-
 .../iterators/system/SynchronizedIterator.java  |  13 ++-
 .../iterators/system/TimeSettingIterator.java   |  13 ++-
 .../core/iterators/system/VisibilityFilter.java |  14 ++-
 .../file/rfile/AuthorizationFilterTest.java     | 109 +++++++++++++++++
 .../core/file/rfile/TimestampFilterTest.java    |   3 +-
 12 files changed, 370 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java
new file mode 100644
index 0000000..d1b1eac
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java
@@ -0,0 +1,117 @@
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Writable;
+
+public class BlockStats implements Writable {
+  
+  private static ColumnVisibility emptyVisibility = new ColumnVisibility();
+  private static int maxVisibilityLength = 100;
+  
+  public BlockStats(long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int entries) {
+    this.minTimestamp = minTimestamp;
+    this.maxTimestamp = maxTimestamp;
+    this.minimumVisibility = minimumVisibility;
+    this.entries = entries;
+    this.version = RFile.RINDEX_VER_7;
+  }
+  
+  long minTimestamp = Long.MAX_VALUE;
+  long maxTimestamp = Long.MIN_VALUE;
+  ColumnVisibility minimumVisibility = null;
+  int entries = 0;
+  final int version;
+  
+  public void updateBlockStats(Key key, Value value) {
+    if (minTimestamp > key.getTimestamp())
+      minTimestamp = key.getTimestamp();
+    if (maxTimestamp < key.getTimestamp())
+      maxTimestamp = key.getTimestamp();
+    entries++;
+    if (key.getColumnVisibilityData().length() > 0)
+      combineVisibilities(new ColumnVisibility(key.getColumnVisibility()));
+    else
+      combineVisibilities(emptyVisibility);
+  }
+  
+  private void combineVisibilities(ColumnVisibility other) {
+    if (minimumVisibility == null)
+      minimumVisibility = other;
+    else
+      minimumVisibility = minimumVisibility.or(other);
+  }
+  
+  public void updateBlockStats(BlockStats other) {
+    this.entries += other.entries;
+    if (this.minTimestamp > other.minTimestamp)
+      this.minTimestamp = other.minTimestamp;
+    if (this.maxTimestamp < other.maxTimestamp)
+      this.maxTimestamp = other.maxTimestamp;
+    combineVisibilities(other.minimumVisibility);
+  }
+  
+  public BlockStats() {
+    minTimestamp = Long.MAX_VALUE;
+    maxTimestamp = Long.MIN_VALUE;
+    minimumVisibility = null;
+    entries = 0;
+    version = RFile.RINDEX_VER_7;
+  }
+  
+  public BlockStats(int version) {
+    this.version = version;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (version == RFile.RINDEX_VER_7) {
+      minTimestamp = in.readLong();
+      maxTimestamp = in.readLong();
+      int visibilityLength = in.readInt();
+      if (visibilityLength >= 0) {
+        byte[] visibility = new byte[visibilityLength];
+        in.readFully(visibility);
+        minimumVisibility = new ColumnVisibility(visibility);
+      } else {
+        minimumVisibility = null;
+      }
+    } else {
+      minTimestamp = Long.MIN_VALUE;
+      maxTimestamp = Long.MAX_VALUE;
+      minimumVisibility = null;
+    }
+    entries = in.readInt();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (version == RFile.RINDEX_VER_7) {
+      out.writeLong(minTimestamp);
+      out.writeLong(maxTimestamp);
+      if (minimumVisibility == null)
+        out.writeInt(-1);
+      else {
+        byte[] visibility = minimumVisibility.getExpression();
+        if (visibility.length > maxVisibilityLength) {
+          System.out.println("expression too large: "+toString());
+          out.writeInt(0);
+        } else {
+          out.writeInt(visibility.length);
+          out.write(visibility);
+        }
+      }
+    }
+    out.writeInt(entries);
+  }
+  
+  @Override
+  public String toString() {
+    return "{"+entries+";"+minTimestamp+";"+maxTimestamp+";"+minimumVisibility+"}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 5e1e8a3..d250155 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -784,7 +784,9 @@ public class RFile {
      * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
      */
     @Override
-    public void applyFilter(Predicate<Key,Value> filter) {
+    public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+      if(required)
+        throw new UnsupportedOperationException("Cannot guarantee filtration");
       // TODO support general filters
       if(filter instanceof TimestampRangePredicate)
       {
@@ -797,16 +799,12 @@ public class RFile {
           timestampRange = p;
         index.setTimestampRange(timestampRange);
       }
-      else if(filter instanceof ColumnVisibilityPredicate)
+      if(filter instanceof ColumnVisibilityPredicate)
       {
     	  filterChanged = true;
     	  columnVisibilityPredicate = (ColumnVisibilityPredicate)filter;
     	  index.setColumnVisibilityPredicate(columnVisibilityPredicate);
       }
-      else
-      {
-        throw new RuntimeException("yikes, not yet implemented");
-      }
     }
   }
   
@@ -1042,7 +1040,9 @@ public class RFile {
         
         if (include) {
           if(timestampFilter != null)
-            lgr.applyFilter(timestampFilter);
+            lgr.applyFilter(timestampFilter,false);
+          if(columnVisibilityPredicate != null)
+            lgr.applyFilter(columnVisibilityPredicate,false);
           lgr.seek(range, EMPTY_CF_SET, false);
           addSource(lgr);
           numLGSeeked++;
@@ -1093,6 +1093,7 @@ public class RFile {
     ArrayList<Predicate<Key,Value>> filters = new ArrayList<Predicate<Key,Value>>();
     
     TimestampRangePredicate timestampFilter = null;
+    ColumnVisibilityPredicate columnVisibilityPredicate = null;
     
     Key topKey;
     Value topValue;
@@ -1171,11 +1172,14 @@ public class RFile {
      * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
      */
     @Override
-    public void applyFilter(Predicate<Key,Value> filter) {
-      filters.add(filter);
+    public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+      if(required)
+        filters.add(filter);
       // the HeapIterator will pass this filter on to its children, a collection of LocalityGroupReaders
       if(filter instanceof TimestampRangePredicate)
         this.timestampFilter = (TimestampRangePredicate)filter;
+      if(filter instanceof ColumnVisibilityPredicate)
+        this.columnVisibilityPredicate = (ColumnVisibilityPredicate)filter;
     }
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java b/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
index bda3665..6743cbc 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
@@ -17,8 +17,15 @@
 package org.apache.accumulo.core.iterators;
 
 /**
- * 
+ * An interface designed to be added to containers to specify what
+ * can be left out when iterating over the contents of that container.
  */
 public interface Filterer<K,V> {
-  public void applyFilter(Predicate<K,V> filter);
+  /**
+   * Either optionally or always leave out entries for which the given Predicate evaluates to false 
+   * @param filter The predicate that specifies whether an entry can be left out
+   * @param required If true, entries that don't pass the filter must be left out. If false, then treat
+   *          purely as a potential optimization.
+   */
+  public void applyFilter(Predicate<K,V> filter, boolean required);
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
index a9c7f2d..84ffb7c 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
@@ -25,7 +25,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 
-public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Value> {
+public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Value>, Filterer<Key,Value> {
   
   private SortedKeyValueIterator<Key,Value> source = null;
   boolean seenSeek = false;
@@ -93,4 +93,13 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
     seenSeek = true;
   }
   
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    if(source instanceof Filterer)
+      ((Filterer<Key,Value>)source).applyFilter(filter, required);
+    else if(required)
+      throw new IllegalArgumentException("Cannot require filter of underlying iterator");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java b/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java
new file mode 100644
index 0000000..cb1b521
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java
@@ -0,0 +1,26 @@
+package org.apache.accumulo.core.iterators.predicates;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Predicate;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+
+public final class ColumnVisibilityPredicate implements Predicate<Key, Value> {
+	public final Authorizations auths;
+
+	public ColumnVisibilityPredicate(Authorizations auths)
+	{
+		this.auths = auths;
+	}
+	
+	@Override
+	public boolean evaluate(Key k, Value v) {
+		return new ColumnVisibility(k.getColumnVisibility()).evaluate(auths);
+	}
+	
+	@Override
+	public String toString() {
+	  return "{"+auths+"}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java
index f406fee..b219c2d 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java
@@ -27,7 +27,9 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
 /**
@@ -37,7 +39,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
  * 
  */
 
-public class MultiIterator extends HeapIterator {
+public class MultiIterator extends HeapIterator implements Filterer<Key,Value> {
   
   private List<SortedKeyValueIterator<Key,Value>> iters;
   private Range fence;
@@ -111,4 +113,15 @@ public class MultiIterator extends HeapIterator {
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
     throw new UnsupportedOperationException();
   }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    for(SortedKeyValueIterator<Key,Value> skvi: iters) {
+      if(skvi instanceof Filterer)
+        ((Filterer<Key,Value>)skvi).applyFilter(filter, required);
+      else if(required)
+        throw new IllegalArgumentException("Cannot require filter of underlying iterator");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index b7069c9..bd907ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -20,8 +20,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.ByteSequence;
@@ -29,10 +31,12 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
-public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator {
+public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator, Filterer<Key,Value> {
   
   public interface DataSource {
     boolean isCurrent();
@@ -144,6 +148,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
     while (!source.isCurrent()) {
       source = source.getNewDataSource();
       iter = source.iterator();
+      applyExistingFilters();
       if (iflag != null)
         ((InterruptibleIterator) iter).setInterruptFlag(iflag);
       
@@ -161,6 +166,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
     
     if (iter == null) {
       iter = source.iterator();
+      applyExistingFilters();
       if (iflag != null)
         ((InterruptibleIterator) iter).setInterruptFlag(iflag);
     }
@@ -197,4 +203,31 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
     
   }
   
+  private Map<Predicate<Key,Value>,Boolean> filters = new HashMap<Predicate<Key,Value>,Boolean>();
+  
+  private void applyExistingFilters()
+  {
+    for(Entry<Predicate<Key,Value>,Boolean> filter:filters.entrySet())
+    {
+      _applyFilter(filter.getKey(), filter.getValue());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void _applyFilter(Predicate<Key,Value> filter, boolean required)
+  {
+    if(iter != null && iter instanceof Filterer)
+      ((Filterer<Key,Value>)iter).applyFilter(filter, required);
+    else if(iter != null && required)
+      throw new IllegalArgumentException("Cannot require filter of underlying iterator");
+  }
+  
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    // apply filter to the current data source
+    _applyFilter(filter,required);
+    // save filter for application to future data sources
+    filters.put(filter, required);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
index 2657bab..b8227be 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
@@ -22,7 +22,9 @@ import java.util.Map;
 
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -30,7 +32,7 @@ import org.apache.hadoop.io.WritableComparable;
 /***
  * SynchronizedIterator: wrap a SortedKeyValueIterator so that all of its methods are synchronized
  */
-public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writable> implements SortedKeyValueIterator<K,V> {
+public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writable> implements SortedKeyValueIterator<K,V>, Filterer<K,V> {
   
   private SortedKeyValueIterator<K,V> source = null;
   
@@ -75,4 +77,13 @@ public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writ
   public SynchronizedIterator(SortedKeyValueIterator<K,V> source) {
     this.source = source;
   }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<K,V> filter, boolean required) {
+    if(source instanceof Filterer)
+      ((Filterer<K,V>)source).applyFilter(filter, required);
+    else if(required)
+      throw new IllegalArgumentException("cannot guarantee filter with non filterer source");
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java
index 4eef14d..1dff72f 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java
@@ -25,11 +25,13 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
-public class TimeSettingIterator implements InterruptibleIterator {
+public class TimeSettingIterator implements InterruptibleIterator, Filterer<Key,Value> {
   
   private SortedKeyValueIterator<Key,Value> source;
   private long time;
@@ -88,5 +90,14 @@ public class TimeSettingIterator implements InterruptibleIterator {
   public Value getTopValue() {
     return source.getTopValue();
   }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    if(source instanceof Filterer)
+      ((Filterer<Key,Value>)source).applyFilter(filter, required);
+    else if(required)
+      throw new IllegalArgumentException("cannot guarantee filter with non filterer source");
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
index a4391c0..2c05a03 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
@@ -16,11 +16,16 @@
  */
 package org.apache.accumulo.core.iterators.system;
 
+import java.io.IOException;
+import java.util.Map;
+
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.predicates.ColumnVisibilityPredicate;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.TextUtil;
@@ -43,7 +48,14 @@ public class VisibilityFilter extends Filter {
     this.cache = new LRUMap(1000);
     this.tmpVis = new Text();
   }
-  
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    if(source instanceof Filterer)
+      ((Filterer<Key,Value>)source).applyFilter(new ColumnVisibilityPredicate(auths), false);
+  }
+
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     return new VisibilityFilter(getSource().deepCopy(env), auths, TextUtil.getBytes(defaultVisibility));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java
new file mode 100644
index 0000000..7dac68b
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.iterators.Predicate;
+import org.apache.accumulo.core.iterators.predicates.ColumnVisibilityPredicate;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+
+public class AuthorizationFilterTest {
+  
+  @Test
+  public void testRFileAuthorizationFiltering() throws Exception {
+    Authorizations auths = new Authorizations("a", "b", "c");
+    Predicate<Key,Value> columnVisibilityPredicate = new ColumnVisibilityPredicate(auths);
+    int expected = 0;
+    Random r = new Random();
+    Configuration conf = new Configuration();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf);
+    RFile.Writer writer = new RFile.Writer(_cbw, 1000, 1000);
+    writer.startDefaultLocalityGroup();
+    byte[] row = new byte[10];
+    byte[] colFam = new byte[10];
+    byte[] colQual = new byte[10];
+    Value value = new Value(new byte[0]);
+    TreeMap<Key,Value> inputBuffer = new TreeMap<Key,Value>();
+    ColumnVisibility[] goodColVises = {new ColumnVisibility("a&b"), new ColumnVisibility("b&c"), new ColumnVisibility("a&c")};
+    ColumnVisibility[] badColVises = {new ColumnVisibility("x"), new ColumnVisibility("y"), new ColumnVisibility("a&z")};
+    for (ColumnVisibility colVis : goodColVises)
+      for (int i = 0; i < 10; i++) {
+        r.nextBytes(row);
+        r.nextBytes(colFam);
+        r.nextBytes(colQual);
+        Key k = new Key(row, colFam, colQual, colVis.getExpression(), (long) i);
+        if (columnVisibilityPredicate.evaluate(k, value))
+          expected++;
+        inputBuffer.put(k, value);
+      }
+    for (ColumnVisibility colVis : badColVises)
+      for (int i = 0; i < 10000; i++) {
+        r.nextBytes(row);
+        r.nextBytes(colFam);
+        r.nextBytes(colQual);
+        Key k = new Key(row, colFam, colQual, colVis.getExpression(), (long) i);
+        if (columnVisibilityPredicate.evaluate(k, value))
+          expected++;
+        inputBuffer.put(k, value);
+      }
+    for (Entry<Key,Value> e : inputBuffer.entrySet()) {
+      writer.append(e.getKey(), e.getValue());
+    }
+    writer.close();
+    
+    // scan the RFile to bring back keys in a given timestamp range
+    byte[] data = baos.toByteArray();
+    
+    ByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
+    FSDataInputStream in = new FSDataInputStream(bais);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
+    RFile.Reader reader = new RFile.Reader(_cbr);
+    int count = 0;
+    reader.applyFilter(columnVisibilityPredicate,true);
+    reader.seek(new Range(), Collections.EMPTY_SET, false);
+    while (reader.hasTop()) {
+      count++;
+      assertTrue(columnVisibilityPredicate.evaluate(reader.getTopKey(), reader.getTopValue()));
+      reader.next();
+    }
+    assertEquals(expected, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/af8cefa7/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
index 463c779..160d7bd 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
@@ -45,7 +45,6 @@ public class TimestampFilterTest {
   
   @Test
   public void testRFileTimestampFiltering() throws Exception {
-    // TODO create an RFile with increasing timestamp and random key order
     Predicate<Key,Value> timeRange = new TimestampRangePredicate(73, 117);
     int expected = 0;
     Random r = new Random();
@@ -84,7 +83,7 @@ public class TimestampFilterTest {
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
     RFile.Reader reader = new RFile.Reader(_cbr);
     int count = 0;
-    reader.applyFilter(timeRange);
+    reader.applyFilter(timeRange,true);
     reader.seek(new Range(), Collections.EMPTY_SET, false);
     while(reader.hasTop())
     {