You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by af...@apache.org on 2012/06/29 15:11:50 UTC

svn commit: r1355353 - in /accumulo/branches/ACCUMULO-652/core/src: main/java/org/apache/accumulo/core/file/rfile/ main/java/org/apache/accumulo/core/iterators/ main/java/org/apache/accumulo/core/iterators/predicates/ main/java/org/apache/accumulo/core...

Author: afuchs
Date: Fri Jun 29 13:11:48 2012
New Revision: 1355353

URL: http://svn.apache.org/viewvc?rev=1355353&view=rev
Log:
ACCUMULO-652 added column visibility block indexing and unit test, added Filterer interface to the system iterators, made VisibilityFilter use the new filtering capability, added notion of optional filtering to the Filterer interface

Added:
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java
    accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java
Modified:
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java
    accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
    accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java

Added: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java?rev=1355353&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java (added)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockStats.java Fri Jun 29 13:11:48 2012
@@ -0,0 +1,117 @@
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Writable;
+
+public class BlockStats implements Writable {
+  
+  private static ColumnVisibility emptyVisibility = new ColumnVisibility();
+  private static int maxVisibilityLength = 100;
+  
+  public BlockStats(long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int entries) {
+    this.minTimestamp = minTimestamp;
+    this.maxTimestamp = maxTimestamp;
+    this.minimumVisibility = minimumVisibility;
+    this.entries = entries;
+    this.version = RFile.RINDEX_VER_7;
+  }
+  
+  long minTimestamp = Long.MAX_VALUE;
+  long maxTimestamp = Long.MIN_VALUE;
+  ColumnVisibility minimumVisibility = null;
+  int entries = 0;
+  final int version;
+  
+  public void updateBlockStats(Key key, Value value) {
+    if (minTimestamp > key.getTimestamp())
+      minTimestamp = key.getTimestamp();
+    if (maxTimestamp < key.getTimestamp())
+      maxTimestamp = key.getTimestamp();
+    entries++;
+    if (key.getColumnVisibilityData().length() > 0)
+      combineVisibilities(new ColumnVisibility(key.getColumnVisibility()));
+    else
+      combineVisibilities(emptyVisibility);
+  }
+  
+  private void combineVisibilities(ColumnVisibility other) {
+    if (minimumVisibility == null)
+      minimumVisibility = other;
+    else
+      minimumVisibility = minimumVisibility.or(other);
+  }
+  
+  public void updateBlockStats(BlockStats other) {
+    this.entries += other.entries;
+    if (this.minTimestamp > other.minTimestamp)
+      this.minTimestamp = other.minTimestamp;
+    if (this.maxTimestamp < other.maxTimestamp)
+      this.maxTimestamp = other.maxTimestamp;
+    combineVisibilities(other.minimumVisibility);
+  }
+  
+  public BlockStats() {
+    minTimestamp = Long.MAX_VALUE;
+    maxTimestamp = Long.MIN_VALUE;
+    minimumVisibility = null;
+    entries = 0;
+    version = RFile.RINDEX_VER_7;
+  }
+  
+  public BlockStats(int version) {
+    this.version = version;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (version == RFile.RINDEX_VER_7) {
+      minTimestamp = in.readLong();
+      maxTimestamp = in.readLong();
+      int visibilityLength = in.readInt();
+      if (visibilityLength >= 0) {
+        byte[] visibility = new byte[visibilityLength];
+        in.readFully(visibility);
+        minimumVisibility = new ColumnVisibility(visibility);
+      } else {
+        minimumVisibility = null;
+      }
+    } else {
+      minTimestamp = Long.MIN_VALUE;
+      maxTimestamp = Long.MAX_VALUE;
+      minimumVisibility = null;
+    }
+    entries = in.readInt();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (version == RFile.RINDEX_VER_7) {
+      out.writeLong(minTimestamp);
+      out.writeLong(maxTimestamp);
+      if (minimumVisibility == null)
+        out.writeInt(-1);
+      else {
+        byte[] visibility = minimumVisibility.getExpression();
+        if (visibility.length > maxVisibilityLength) {
+          System.out.println("expression too large: "+toString());
+          out.writeInt(0);
+        } else {
+          out.writeInt(visibility.length);
+          out.write(visibility);
+        }
+      }
+    }
+    out.writeInt(entries);
+  }
+  
+  @Override
+  public String toString() {
+    return "{"+entries+";"+minTimestamp+";"+maxTimestamp+";"+minimumVisibility+"}";
+  }
+}

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Fri Jun 29 13:11:48 2012
@@ -784,7 +784,9 @@ public class RFile {
      * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
      */
     @Override
-    public void applyFilter(Predicate<Key,Value> filter) {
+    public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+      if(required)
+        throw new UnsupportedOperationException("Cannot guarantee filtration");
       // TODO support general filters
       if(filter instanceof TimestampRangePredicate)
       {
@@ -797,16 +799,12 @@ public class RFile {
           timestampRange = p;
         index.setTimestampRange(timestampRange);
       }
-      else if(filter instanceof ColumnVisibilityPredicate)
+      if(filter instanceof ColumnVisibilityPredicate)
       {
     	  filterChanged = true;
     	  columnVisibilityPredicate = (ColumnVisibilityPredicate)filter;
     	  index.setColumnVisibilityPredicate(columnVisibilityPredicate);
       }
-      else
-      {
-        throw new RuntimeException("yikes, not yet implemented");
-      }
     }
   }
   
@@ -1042,7 +1040,9 @@ public class RFile {
         
         if (include) {
           if(timestampFilter != null)
-            lgr.applyFilter(timestampFilter);
+            lgr.applyFilter(timestampFilter,false);
+          if(columnVisibilityPredicate != null)
+            lgr.applyFilter(columnVisibilityPredicate,false);
           lgr.seek(range, EMPTY_CF_SET, false);
           addSource(lgr);
           numLGSeeked++;
@@ -1093,6 +1093,7 @@ public class RFile {
     ArrayList<Predicate<Key,Value>> filters = new ArrayList<Predicate<Key,Value>>();
     
     TimestampRangePredicate timestampFilter = null;
+    ColumnVisibilityPredicate columnVisibilityPredicate = null;
     
     Key topKey;
     Value topValue;
@@ -1171,11 +1172,14 @@ public class RFile {
      * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
      */
     @Override
-    public void applyFilter(Predicate<Key,Value> filter) {
-      filters.add(filter);
+    public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+      if(required)
+        filters.add(filter);
       // the HeapIterator will pass this filter on to its children, a collection of LocalityGroupReaders
       if(filter instanceof TimestampRangePredicate)
         this.timestampFilter = (TimestampRangePredicate)filter;
+      if(filter instanceof ColumnVisibilityPredicate)
+        this.columnVisibilityPredicate = (ColumnVisibilityPredicate)filter;
     }
   }
   

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java Fri Jun 29 13:11:48 2012
@@ -17,8 +17,15 @@
 package org.apache.accumulo.core.iterators;
 
 /**
- * 
+ * An interface designed to be added to containers to specify what
+ * can be left out when iterating over the contents of that container.
  */
 public interface Filterer<K,V> {
-  public void applyFilter(Predicate<K,V> filter);
+  /**
+   * Either optionally or always leave out entries for which the given Predicate evaluates to false 
+   * @param filter The predicate that specifies whether an entry can be left out
+   * @param required If true, entries that don't pass the filter must be left out. If false, then treat
+   *          purely as a potential optimization.
+   */
+  public void applyFilter(Predicate<K,V> filter, boolean required);
 }

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java Fri Jun 29 13:11:48 2012
@@ -25,7 +25,7 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 
-public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Value> {
+public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Value>, Filterer<Key,Value> {
   
   private SortedKeyValueIterator<Key,Value> source = null;
   boolean seenSeek = false;
@@ -93,4 +93,13 @@ public abstract class WrappingIterator i
     seenSeek = true;
   }
   
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    if(source instanceof Filterer)
+      ((Filterer<Key,Value>)source).applyFilter(filter, required);
+    else if(required)
+      throw new IllegalArgumentException("Cannot require filter of underlying iterator");
+  }
+
 }

Added: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java?rev=1355353&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java (added)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/predicates/ColumnVisibilityPredicate.java Fri Jun 29 13:11:48 2012
@@ -0,0 +1,26 @@
+package org.apache.accumulo.core.iterators.predicates;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Predicate;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+
+public final class ColumnVisibilityPredicate implements Predicate<Key, Value> {
+	public final Authorizations auths;
+
+	public ColumnVisibilityPredicate(Authorizations auths)
+	{
+		this.auths = auths;
+	}
+	
+	@Override
+	public boolean evaluate(Key k, Value v) {
+		return new ColumnVisibility(k.getColumnVisibility()).evaluate(auths);
+	}
+	
+	@Override
+	public String toString() {
+	  return "{"+auths+"}";
+	}
+}

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/MultiIterator.java Fri Jun 29 13:11:48 2012
@@ -27,7 +27,9 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
 /**
@@ -37,7 +39,7 @@ import org.apache.accumulo.core.iterator
  * 
  */
 
-public class MultiIterator extends HeapIterator {
+public class MultiIterator extends HeapIterator implements Filterer<Key,Value> {
   
   private List<SortedKeyValueIterator<Key,Value>> iters;
   private Range fence;
@@ -111,4 +113,15 @@ public class MultiIterator extends HeapI
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
     throw new UnsupportedOperationException();
   }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    for(SortedKeyValueIterator<Key,Value> skvi: iters) {
+      if(skvi instanceof Filterer)
+        ((Filterer<Key,Value>)skvi).applyFilter(filter, required);
+      else if(required)
+        throw new IllegalArgumentException("Cannot require filter of underlying iterator");
+    }
+  }
 }

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java Fri Jun 29 13:11:48 2012
@@ -20,8 +20,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.ByteSequence;
@@ -29,10 +31,12 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
-public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator {
+public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator, Filterer<Key,Value> {
   
   public interface DataSource {
     boolean isCurrent();
@@ -144,6 +148,7 @@ public class SourceSwitchingIterator imp
     while (!source.isCurrent()) {
       source = source.getNewDataSource();
       iter = source.iterator();
+      applyExistingFilters();
       if (iflag != null)
         ((InterruptibleIterator) iter).setInterruptFlag(iflag);
       
@@ -161,6 +166,7 @@ public class SourceSwitchingIterator imp
     
     if (iter == null) {
       iter = source.iterator();
+      applyExistingFilters();
       if (iflag != null)
         ((InterruptibleIterator) iter).setInterruptFlag(iflag);
     }
@@ -197,4 +203,31 @@ public class SourceSwitchingIterator imp
     
   }
   
+  private Map<Predicate<Key,Value>,Boolean> filters = new HashMap<Predicate<Key,Value>,Boolean>();
+  
+  private void applyExistingFilters()
+  {
+    for(Entry<Predicate<Key,Value>,Boolean> filter:filters.entrySet())
+    {
+      _applyFilter(filter.getKey(), filter.getValue());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void _applyFilter(Predicate<Key,Value> filter, boolean required)
+  {
+    if(iter != null && iter instanceof Filterer)
+      ((Filterer<Key,Value>)iter).applyFilter(filter, required);
+    else if(iter != null && required)
+      throw new IllegalArgumentException("Cannot require filter of underlying iterator");
+  }
+  
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    // apply filter to the current data source
+    _applyFilter(filter,required);
+    // save filter for application to future data sources
+    filters.put(filter, required);
+  }
+
 }

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java Fri Jun 29 13:11:48 2012
@@ -22,7 +22,9 @@ import java.util.Map;
 
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -30,7 +32,7 @@ import org.apache.hadoop.io.WritableComp
 /***
  * SynchronizedIterator: wrap a SortedKeyValueIterator so that all of its methods are synchronized
  */
-public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writable> implements SortedKeyValueIterator<K,V> {
+public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writable> implements SortedKeyValueIterator<K,V>, Filterer<K,V> {
   
   private SortedKeyValueIterator<K,V> source = null;
   
@@ -75,4 +77,13 @@ public class SynchronizedIterator<K exte
   public SynchronizedIterator(SortedKeyValueIterator<K,V> source) {
     this.source = source;
   }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<K,V> filter, boolean required) {
+    if(source instanceof Filterer)
+      ((Filterer<K,V>)source).applyFilter(filter, required);
+    else if(required)
+      throw new IllegalArgumentException("cannot guarantee filter with non filterer source");
+  }
 }

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/TimeSettingIterator.java Fri Jun 29 13:11:48 2012
@@ -25,11 +25,13 @@ import org.apache.accumulo.core.data.Byt
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
-public class TimeSettingIterator implements InterruptibleIterator {
+public class TimeSettingIterator implements InterruptibleIterator, Filterer<Key,Value> {
   
   private SortedKeyValueIterator<Key,Value> source;
   private long time;
@@ -88,5 +90,14 @@ public class TimeSettingIterator impleme
   public Value getTopValue() {
     return source.getTopValue();
   }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyFilter(Predicate<Key,Value> filter, boolean required) {
+    if(source instanceof Filterer)
+      ((Filterer<Key,Value>)source).applyFilter(filter, required);
+    else if(required)
+      throw new IllegalArgumentException("cannot guarantee filter with non filterer source");
+  }
   
 }

Modified: accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java Fri Jun 29 13:11:48 2012
@@ -16,11 +16,16 @@
  */
 package org.apache.accumulo.core.iterators.system;
 
+import java.io.IOException;
+import java.util.Map;
+
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.predicates.ColumnVisibilityPredicate;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.TextUtil;
@@ -43,7 +48,14 @@ public class VisibilityFilter extends Fi
     this.cache = new LRUMap(1000);
     this.tmpVis = new Text();
   }
-  
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    if(source instanceof Filterer)
+      ((Filterer<Key,Value>)source).applyFilter(new ColumnVisibilityPredicate(auths), false);
+  }
+
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     return new VisibilityFilter(getSource().deepCopy(env), auths, TextUtil.getBytes(defaultVisibility));

Added: accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java?rev=1355353&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java (added)
+++ accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/AuthorizationFilterTest.java Fri Jun 29 13:11:48 2012
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.iterators.Predicate;
+import org.apache.accumulo.core.iterators.predicates.ColumnVisibilityPredicate;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+
+public class AuthorizationFilterTest {
+  
+  @Test
+  public void testRFileAuthorizationFiltering() throws Exception {
+    Authorizations auths = new Authorizations("a", "b", "c");
+    Predicate<Key,Value> columnVisibilityPredicate = new ColumnVisibilityPredicate(auths);
+    int expected = 0;
+    Random r = new Random();
+    Configuration conf = new Configuration();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf);
+    RFile.Writer writer = new RFile.Writer(_cbw, 1000, 1000);
+    writer.startDefaultLocalityGroup();
+    byte[] row = new byte[10];
+    byte[] colFam = new byte[10];
+    byte[] colQual = new byte[10];
+    Value value = new Value(new byte[0]);
+    TreeMap<Key,Value> inputBuffer = new TreeMap<Key,Value>();
+    ColumnVisibility[] goodColVises = {new ColumnVisibility("a&b"), new ColumnVisibility("b&c"), new ColumnVisibility("a&c")};
+    ColumnVisibility[] badColVises = {new ColumnVisibility("x"), new ColumnVisibility("y"), new ColumnVisibility("a&z")};
+    for (ColumnVisibility colVis : goodColVises)
+      for (int i = 0; i < 10; i++) {
+        r.nextBytes(row);
+        r.nextBytes(colFam);
+        r.nextBytes(colQual);
+        Key k = new Key(row, colFam, colQual, colVis.getExpression(), (long) i);
+        if (columnVisibilityPredicate.evaluate(k, value))
+          expected++;
+        inputBuffer.put(k, value);
+      }
+    for (ColumnVisibility colVis : badColVises)
+      for (int i = 0; i < 10000; i++) {
+        r.nextBytes(row);
+        r.nextBytes(colFam);
+        r.nextBytes(colQual);
+        Key k = new Key(row, colFam, colQual, colVis.getExpression(), (long) i);
+        if (columnVisibilityPredicate.evaluate(k, value))
+          expected++;
+        inputBuffer.put(k, value);
+      }
+    for (Entry<Key,Value> e : inputBuffer.entrySet()) {
+      writer.append(e.getKey(), e.getValue());
+    }
+    writer.close();
+    
+    // scan the RFile to bring back keys in a given timestamp range
+    byte[] data = baos.toByteArray();
+    
+    ByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
+    FSDataInputStream in = new FSDataInputStream(bais);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
+    RFile.Reader reader = new RFile.Reader(_cbr);
+    int count = 0;
+    reader.applyFilter(columnVisibilityPredicate,true);
+    reader.seek(new Range(), Collections.EMPTY_SET, false);
+    while (reader.hasTop()) {
+      count++;
+      assertTrue(columnVisibilityPredicate.evaluate(reader.getTopKey(), reader.getTopValue()));
+      reader.next();
+    }
+    assertEquals(expected, count);
+  }
+}

Modified: accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java?rev=1355353&r1=1355352&r2=1355353&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java (original)
+++ accumulo/branches/ACCUMULO-652/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java Fri Jun 29 13:11:48 2012
@@ -45,7 +45,6 @@ public class TimestampFilterTest {
   
   @Test
   public void testRFileTimestampFiltering() throws Exception {
-    // TODO create an RFile with increasing timestamp and random key order
     Predicate<Key,Value> timeRange = new TimestampRangePredicate(73, 117);
     int expected = 0;
     Random r = new Random();
@@ -84,7 +83,7 @@ public class TimestampFilterTest {
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
     RFile.Reader reader = new RFile.Reader(_cbr);
     int count = 0;
-    reader.applyFilter(timeRange);
+    reader.applyFilter(timeRange,true);
     reader.seek(new Range(), Collections.EMPTY_SET, false);
     while(reader.hasTop())
     {