You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/22 22:45:01 UTC

[03/15] git commit: ACCUMULO-652 initial mods to RFile to keep track of extra block statistics

ACCUMULO-652 initial mods to RFile to keep track of extra block statistics

git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/ACCUMULO-652@1354475 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3fcd07de
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3fcd07de
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3fcd07de

Branch: refs/heads/ACCUMULO-652
Commit: 3fcd07de50699ae829ddbe892579126837306ab4
Parents: fd77a56
Author: Adam Fuchs <af...@apache.org>
Authored: Wed Jun 27 12:48:16 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Wed Jun 27 12:48:16 2012 +0000

----------------------------------------------------------------------
 .../core/file/rfile/MultiLevelIndex.java        | 457 ++++++++++---------
 .../apache/accumulo/core/file/rfile/RFile.java  | 183 +++++++-
 .../accumulo/core/iterators/Filterer.java       |  24 +
 .../accumulo/core/iterators/Predicate.java      |  24 +
 .../predicates/TimestampRangePredicate.java     |  54 +++
 .../core/iterators/system/HeapIterator.java     |  14 +-
 .../core/iterators/system/VisibilityFilter.java |  21 +-
 .../core/security/ColumnVisibility.java         | 342 +++++++++-----
 .../core/security/VisibilityConstraint.java     |   9 +-
 .../core/file/rfile/MultiLevelIndexTest.java    |  16 +-
 .../accumulo/core/file/rfile/RFileTest.java     |  45 +-
 .../core/file/rfile/TimestampFilterTest.java    |  98 ++++
 .../iterators/user/IndexedDocIteratorTest.java  |  14 +-
 .../core/security/ColumnVisibilityTest.java     |  60 ++-
 .../core/security/VisibilityEvaluatorTest.java  |  20 +-
 .../examples/wikisearch/parser/EventFields.java |   6 +-
 16 files changed, 983 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index b973cc3..e2b4b15 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -27,10 +27,11 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.RandomAccess;
+import java.util.Stack;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.file.blockfile.ABlockReader;
@@ -38,37 +39,58 @@ import org.apache.accumulo.core.file.blockfile.ABlockWriter;
 import org.apache.accumulo.core.file.blockfile.BlockFileReader;
 import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
+import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.hadoop.io.WritableComparable;
 
 public class MultiLevelIndex {
   
   public static class IndexEntry implements WritableComparable<IndexEntry> {
     private Key key;
+    private long minTimestamp;
+    private long maxTimestamp;
+    private ColumnVisibility minimumVisibility = null;
     private int entries;
     private long offset;
     private long compressedSize;
     private long rawSize;
-    private boolean newFormat;
+    private int format;
     
-    IndexEntry(Key k, int e, long offset, long compressedSize, long rawSize) {
+    IndexEntry(Key k, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int e, long offset, long compressedSize, long rawSize, int version) {
       this.key = k;
+      this.minTimestamp = minTimestamp;
+      this.maxTimestamp = maxTimestamp;
+      this.minimumVisibility = minimumVisibility;
       this.entries = e;
       this.offset = offset;
       this.compressedSize = compressedSize;
       this.rawSize = rawSize;
-      newFormat = true;
+      format = version;
     }
     
-    public IndexEntry(boolean newFormat) {
-      this.newFormat = newFormat;
+    public IndexEntry(int format) {
+      this.format = format;
     }
     
     @Override
     public void readFields(DataInput in) throws IOException {
       key = new Key();
       key.readFields(in);
+      if(format == RFile.RINDEX_VER_7)
+      {
+        minTimestamp = in.readLong();
+        maxTimestamp = in.readLong();
+        byte[] visibility = new byte[in.readInt()];
+        in.readFully(visibility);
+        minimumVisibility = new ColumnVisibility(visibility);
+      }
+      else
+      {
+        minTimestamp = Long.MIN_VALUE;
+        maxTimestamp = Long.MAX_VALUE;
+      }
       entries = in.readInt();
-      if (newFormat) {
+      if (format == RFile.RINDEX_VER_6 || format == RFile.RINDEX_VER_7) {
         offset = Utils.readVLong(in);
         compressedSize = Utils.readVLong(in);
         rawSize = Utils.readVLong(in);
@@ -82,8 +104,16 @@ public class MultiLevelIndex {
     @Override
     public void write(DataOutput out) throws IOException {
       key.write(out);
+      if(format == RFile.RINDEX_VER_7)
+      {
+        out.writeLong(minTimestamp);
+        out.writeLong(maxTimestamp);
+        byte[] visibility = minimumVisibility.getExpression();
+        out.writeInt(visibility.length);
+        out.write(visibility);
+      }
       out.writeInt(entries);
-      if (newFormat) {
+      if (format == RFile.RINDEX_VER_6 || format == RFile.RINDEX_VER_7) {
         Utils.writeVLong(out, offset);
         Utils.writeVLong(out, compressedSize);
         Utils.writeVLong(out, rawSize);
@@ -121,12 +151,12 @@ public class MultiLevelIndex {
     
     private int[] offsets;
     private byte[] data;
-    private boolean newFormat;
+    private int format;
     
-    SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
+    SerializedIndex(int[] offsets, byte[] data, int format) {
       this.offsets = offsets;
       this.data = data;
-      this.newFormat = newFormat;
+      this.format = format;
     }
     
     @Override
@@ -140,7 +170,7 @@ public class MultiLevelIndex {
       ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
       DataInputStream dis = new DataInputStream(bais);
       
-      IndexEntry ie = new IndexEntry(newFormat);
+      IndexEntry ie = new IndexEntry(format);
       try {
         ie.readFields(dis);
       } catch (IOException e) {
@@ -203,6 +233,10 @@ public class MultiLevelIndex {
     private ByteArrayOutputStream indexBytes;
     private DataOutputStream indexOut;
     
+    private long minTimestamp = Long.MAX_VALUE;
+    private long maxTimestamp = Long.MIN_VALUE;
+    private ColumnVisibility minimumVisibility = null;
+    
     private ArrayList<Integer> offsets;
     private int level;
     private int offset;
@@ -212,8 +246,6 @@ public class MultiLevelIndex {
     private boolean hasNext;
     
     public IndexBlock(int level, int totalAdded) {
-      // System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")");
-      
       this.level = level;
       this.offset = totalAdded;
       
@@ -224,9 +256,17 @@ public class MultiLevelIndex {
     
     public IndexBlock() {}
     
-    public void add(Key key, int value, long offset, long compressedSize, long rawSize) throws IOException {
+    public void add(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int value, long offset, long compressedSize, long rawSize, int version) throws IOException {
       offsets.add(indexOut.size());
-      new IndexEntry(key, value, offset, compressedSize, rawSize).write(indexOut);
+      if (this.minTimestamp > minTimestamp)
+        this.minTimestamp = minTimestamp;
+      if (this.maxTimestamp < maxTimestamp)
+        this.maxTimestamp = maxTimestamp;
+      if(this.minimumVisibility == null)
+        this.minimumVisibility = minimumVisibility;
+      else
+        this.minimumVisibility = this.minimumVisibility.or(minimumVisibility);
+      new IndexEntry(key, minTimestamp, maxTimestamp, minimumVisibility, value, offset, compressedSize, rawSize, version).write(indexOut);
     }
     
     int getSize() {
@@ -252,7 +292,7 @@ public class MultiLevelIndex {
     
     public void readFields(DataInput in, int version) throws IOException {
       
-      if (version == RFile.RINDEX_VER_6) {
+      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
         level = in.readInt();
         offset = in.readInt();
         hasNext = in.readBoolean();
@@ -267,7 +307,7 @@ public class MultiLevelIndex {
         byte[] serializedIndex = new byte[indexSize];
         in.readFully(serializedIndex);
         
-        index = new SerializedIndex(offsets, serializedIndex, true);
+        index = new SerializedIndex(offsets, serializedIndex, version);
         keyIndex = new KeyIndex(offsets, serializedIndex);
       } else if (version == RFile.RINDEX_VER_3) {
         level = 0;
@@ -281,7 +321,7 @@ public class MultiLevelIndex {
         ArrayList<Integer> oal = new ArrayList<Integer>();
         
         for (int i = 0; i < size; i++) {
-          IndexEntry ie = new IndexEntry(false);
+          IndexEntry ie = new IndexEntry(version);
           oal.add(dos.size());
           ie.readFields(in);
           ie.write(dos);
@@ -295,7 +335,7 @@ public class MultiLevelIndex {
         }
         
         byte[] serializedIndex = baos.toByteArray();
-        index = new SerializedIndex(oia, serializedIndex, false);
+        index = new SerializedIndex(oia, serializedIndex, version);
         keyIndex = new KeyIndex(oia, serializedIndex);
       } else if (version == RFile.RINDEX_VER_4) {
         level = 0;
@@ -312,7 +352,7 @@ public class MultiLevelIndex {
         byte[] indexData = new byte[size];
         in.readFully(indexData);
         
-        index = new SerializedIndex(offsets, indexData, false);
+        index = new SerializedIndex(offsets, indexData, version);
         keyIndex = new KeyIndex(offsets, indexData);
       } else {
         throw new RuntimeException("Unexpected version " + version);
@@ -356,12 +396,14 @@ public class MultiLevelIndex {
     private DataOutputStream buffer;
     private int buffered;
     private ByteArrayOutputStream baos;
+    private final int version;
     
     public BufferedWriter(Writer writer) {
       this.writer = writer;
       baos = new ByteArrayOutputStream(1 << 20);
       buffer = new DataOutputStream(baos);
       buffered = 0;
+      version = RFile.RINDEX_VER_7;
     }
     
     private void flush() throws IOException {
@@ -369,10 +411,10 @@ public class MultiLevelIndex {
       
       DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
       
-      IndexEntry ie = new IndexEntry(true);
+      IndexEntry ie = new IndexEntry(version);
       for (int i = 0; i < buffered; i++) {
         ie.readFields(dis);
-        writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
+        writer.add(ie.getKey(), ie.minTimestamp, ie.maxTimestamp, ie.minimumVisibility, ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize(), ie.format);
       }
       
       buffered = 0;
@@ -381,18 +423,18 @@ public class MultiLevelIndex {
       
     }
     
-    public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+    public void add(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
       if (buffer.size() > (10 * 1 << 20)) {
         flush();
       }
       
-      new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer);
+      new IndexEntry(key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, version).write(buffer);
       buffered++;
     }
     
-    public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+    public void addLast(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
       flush();
-      writer.addLast(key, data, offset, compressedSize, rawSize);
+      writer.addLast(key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, version);
     }
     
     public void close(DataOutput out) throws IOException {
@@ -417,30 +459,26 @@ public class MultiLevelIndex {
       levels = new ArrayList<IndexBlock>();
     }
     
-    private void add(int level, Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+    private void add(int level, Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, boolean last, int version)
+        throws IOException {
       if (level == levels.size()) {
         levels.add(new IndexBlock(level, 0));
       }
       
       IndexBlock iblock = levels.get(level);
       
-      iblock.add(key, data, offset, compressedSize, rawSize);
-    }
-    
-    private void flush(int level, Key lastKey, boolean last) throws IOException {
+      iblock.add(key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, version);
       
       if (last && level == levels.size() - 1)
         return;
       
-      IndexBlock iblock = levels.get(level);
       if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last) {
         ABlockWriter out = blockFileWriter.prepareDataBlock();
         iblock.setHasNext(!last);
         iblock.write(out);
         out.close();
         
-        add(level + 1, lastKey, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize());
-        flush(level + 1, lastKey, last);
+        add(level + 1, key, iblock.minTimestamp, iblock.maxTimestamp, iblock.minimumVisibility, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize(), last, version);
         
         if (last)
           levels.set(level, null);
@@ -449,19 +487,17 @@ public class MultiLevelIndex {
       }
     }
     
-    public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+    public void add(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
       totalAdded++;
-      add(0, key, data, offset, compressedSize, rawSize);
-      flush(0, key, false);
+      add(0, key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, false, version);
     }
     
-    public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+    public void addLast(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
       if (addedLast)
         throw new IllegalStateException("already added last");
       
       totalAdded++;
-      add(0, key, data, offset, compressedSize, rawSize);
-      flush(0, key, true);
+      add(0, key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, true, version);
       addedLast = true;
       
     }
@@ -487,215 +523,196 @@ public class MultiLevelIndex {
     private int version;
     private int size;
     
-    public class Node {
-      
-      private Node parent;
-      private IndexBlock indexBlock;
-      private int currentPos;
-      
-      Node(Node parent, IndexBlock iBlock) {
-        this.parent = parent;
-        this.indexBlock = iBlock;
-      }
+    class StackEntry {
+      public final IndexBlock block;
+      public int offset;
       
-      Node(IndexBlock rootInfo) {
-        this.parent = null;
-        this.indexBlock = rootInfo;
-      }
-      
-      private Node lookup(Key key) throws IOException {
-        int pos = Collections.binarySearch(indexBlock.getKeyIndex(), key, new Comparator<Key>() {
-          @Override
-          public int compare(Key o1, Key o2) {
-            return o1.compareTo(o2);
-          }
-        });
-        
-        if (pos < 0)
-          pos = (pos * -1) - 1;
-        
-        if (pos == indexBlock.getIndex().size()) {
-          if (parent != null)
-            throw new IllegalStateException();
-          this.currentPos = pos;
-          return this;
-        }
-        
-        this.currentPos = pos;
-        
-        if (indexBlock.getLevel() == 0) {
-          return this;
-        }
-        
-        IndexEntry ie = indexBlock.getIndex().get(pos);
-        Node child = new Node(this, getIndexBlock(ie));
-        return child.lookup(key);
-      }
-      
-      private Node getLast() throws IOException {
-        currentPos = indexBlock.getIndex().size() - 1;
-        if (indexBlock.getLevel() == 0)
-          return this;
-        
-        IndexEntry ie = indexBlock.getIndex().get(currentPos);
-        Node child = new Node(this, getIndexBlock(ie));
-        return child.getLast();
-      }
-      
-      private Node getFirst() throws IOException {
-        currentPos = 0;
-        if (indexBlock.getLevel() == 0)
-          return this;
-        
-        IndexEntry ie = indexBlock.getIndex().get(currentPos);
-        Node child = new Node(this, getIndexBlock(ie));
-        return child.getFirst();
-      }
-      
-      private Node getPrevious() throws IOException {
-        if (currentPos == 0)
-          return parent.getPrevious();
-        
-        currentPos--;
-        
-        IndexEntry ie = indexBlock.getIndex().get(currentPos);
-        Node child = new Node(this, getIndexBlock(ie));
-        return child.getLast();
-        
-      }
-      
-      private Node getNext() throws IOException {
-        if (currentPos == indexBlock.getIndex().size() - 1)
-          return parent.getNext();
-        
-        currentPos++;
-        
-        IndexEntry ie = indexBlock.getIndex().get(currentPos);
-        Node child = new Node(this, getIndexBlock(ie));
-        return child.getFirst();
-        
-      }
-      
-      Node getNextNode() throws IOException {
-        return parent.getNext();
-      }
-      
-      Node getPreviousNode() throws IOException {
-        return parent.getPrevious();
+      public StackEntry(IndexBlock block, int offset) {
+        this.block = block;
+        this.offset = offset;
       }
     }
     
-    public class IndexIterator implements ListIterator<IndexEntry> {
-      
-      private Node node;
-      private ListIterator<IndexEntry> liter;
+    class IndexIterator implements Iterator<IndexEntry> {
+      private Stack<StackEntry> position = new Stack<StackEntry>();
+      private final TimestampRangePredicate timestampFilter;
       
-      private Node getPrevNode() {
+      private IndexIterator(TimestampRangePredicate timestampFilter, Key lookupKey) {
+        this.timestampFilter = timestampFilter;
         try {
-          return node.getPreviousNode();
+          seek(lookupKey);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
       }
       
-      private Node getNextNode() {
-        try {
-          return node.getNextNode();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
+      private final boolean checkFilterIndexEntry(IndexEntry ie) {
+        if(timestampFilter == null)
+        if (timestampFilter != null && (ie.maxTimestamp < timestampFilter.startTimestamp || ie.minTimestamp > timestampFilter.endTimestamp)) {
+          return false;
+        }
+        return true;
+      }
+      
+      private void seek(Key lookupKey) throws IOException {
+        StackEntry top = new StackEntry(rootBlock, -1);
+        position.add(top);
+        while (true) {
+          top = position.peek();
+          // go down the tree
+          int pos = Collections.binarySearch(top.block.getKeyIndex(), lookupKey, new Comparator<Key>() {
+            @Override
+            public int compare(Key o1, Key o2) {
+              return o1.compareTo(o2);
+            }
+          });
+          
+          
+          if (pos < 0) {
+            pos = (pos * -1) - 1;
+          } else if (pos < top.block.getKeyIndex().size()) {
+            // the exact key was found, so we want to go back to the first identical match
+            while (pos > 0 && top.block.getKeyIndex().get(pos - 1).equals(lookupKey)) {
+              pos--;
+            }
+          }
+          
+
+          IndexEntry ie = null;
+          List<IndexEntry> index = top.block.getIndex();
+          
+          if(pos > 0)
+          {
+            // look backwards to find any initial previousEntry that might match the timestamp range such that no entry within the given timestamp range is between the seeked key and the previousKey
+            previousEntry = index.get(pos-1);
+            // TODO: find the offset for this block
+            previousIndex = Integer.MIN_VALUE;
+          }
+          
+          while (pos < index.size()) {
+            ie = index.get(pos);
+            // filter on timestampRange by skipping forward until a block passes the predicate
+            if (checkFilterIndexEntry(ie))
+              break;
+            pos++;
+          }
+          
+          
+          if (pos == index.size()) {
+            position.pop();
+            goToNext();
+            return;
+          } else {
+            if (top.block.level == 0) {
+              // found a matching index entry
+              top.offset = pos - 1;
+              return;
+            } else {
+              top.offset = pos;
+              position.add(new StackEntry(getIndexBlock(ie), 0));
+            }
+          }
         }
       }
       
-      public IndexIterator() {
-        node = null;
-      }
-      
-      public IndexIterator(Node node) {
-        this.node = node;
-        liter = node.indexBlock.getIndex().listIterator(node.currentPos);
-      }
-      
-      @Override
-      public boolean hasNext() {
-        if (node == null)
-          return false;
-        
-        if (!liter.hasNext()) {
-          return node.indexBlock.hasNext();
-        } else {
-          return true;
+      private void goToNext() throws IOException {
+        int numSkippedBlocks = 0;
+        // traverse the index tree forwards
+        while (position.isEmpty() == false) {
+          StackEntry top = position.peek();
+          top.offset++;
+          List<IndexEntry> index = top.block.getIndex();
+          while (top.offset < index.size()) {
+            if (checkFilterIndexEntry(index.get(top.offset)))
+              break;
+            numSkippedBlocks++;
+            top.offset++;
+          }
+          if (top.offset == index.size()) {
+            // go up
+            position.pop();
+          } else {
+            if (top.block.level == 0) {
+              // success!
+              return;
+            }
+            // go down
+            position.add(new StackEntry(getIndexBlock(index.get(top.offset)), -1));
+          }
         }
-        
       }
       
-      public IndexEntry peekPrevious() {
-        IndexEntry ret = previous();
-        next();
-        return ret;
-      }
+      IndexEntry nextEntry = null;
+      IndexEntry previousEntry = null;
+      int nextIndex = -1;
+      int previousIndex = -1;
       
-      public IndexEntry peek() {
-        IndexEntry ret = next();
-        previous();
-        return ret;
+      private void prepNext() {
+        if (nextEntry == null) {
+          try {
+            goToNext();
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+          if (position.isEmpty())
+            return;
+          StackEntry e = position.peek();
+          nextEntry = e.block.getIndex().get(e.offset);
+          nextIndex = e.block.getOffset() + e.offset;
+        }
       }
       
-      @Override
-      public IndexEntry next() {
-        if (!liter.hasNext()) {
-          node = getNextNode();
-          liter = node.indexBlock.getIndex().listIterator();
-        }
+      public boolean hasNext() {
+        if (nextEntry == null)
+          prepNext();
+        return nextEntry != null;
         
-        return liter.next();
       }
       
-      @Override
+      // initially, previous key is last key of the previous block
       public boolean hasPrevious() {
-        if (node == null)
-          return false;
-        
-        if (!liter.hasPrevious()) {
-          return node.indexBlock.getOffset() > 0;
-        } else {
-          return true;
-        }
+        return previousEntry != null;
       }
       
-      @Override
-      public IndexEntry previous() {
-        if (!liter.hasPrevious()) {
-          node = getPrevNode();
-          liter = node.indexBlock.getIndex().listIterator(node.indexBlock.getIndex().size());
-        }
-        
-        return liter.previous();
+      public int nextIndex() {
+        if (nextEntry == null)
+          prepNext();
+        return nextIndex;
       }
       
-      @Override
-      public int nextIndex() {
-        return node.indexBlock.getOffset() + liter.nextIndex();
+      public IndexEntry peek() {
+        if (nextEntry == null)
+          prepNext();
+        return nextEntry;
       }
       
-      @Override
-      public int previousIndex() {
-        return node.indexBlock.getOffset() + liter.previousIndex();
+      private int blocksReturned = 0;
+      
+      public IndexEntry next() {
+        prepNext();
+        previousEntry = nextEntry;
+        nextEntry = null;
+        previousIndex = nextIndex;
+        nextIndex = -1;
+        return previousEntry;
       }
       
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
+      public IndexEntry peekPrevious() {
+        return previousEntry;
       }
       
+      /*
+       * (non-Javadoc)
+       * 
+       * @see java.util.Iterator#remove()
+       */
       @Override
-      public void set(IndexEntry e) {
+      public void remove() {
         throw new UnsupportedOperationException();
-        
       }
       
-      @Override
-      public void add(IndexEntry e) {
-        throw new UnsupportedOperationException();
+      public int previousIndex() {
+        return previousIndex;
       }
       
     }
@@ -714,16 +731,15 @@ public class MultiLevelIndex {
       return iblock;
     }
     
-    public IndexIterator lookup(Key key) throws IOException {
-      Node node = new Node(rootBlock);
-      return new IndexIterator(node.lookup(key));
+    IndexIterator lookup(Key key) throws IOException {
+      return new IndexIterator(timestampRange, key);
     }
     
     public void readFields(DataInput in) throws IOException {
       
       size = 0;
       
-      if (version == RFile.RINDEX_VER_6) {
+      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
         size = in.readInt();
       }
       
@@ -769,6 +785,15 @@ public class MultiLevelIndex {
     public Key getLastKey() {
       return rootBlock.getIndex().get(rootBlock.getIndex().size() - 1).getKey();
     }
+    
+    TimestampRangePredicate timestampRange;
+    
+    /**
+     * @param r
+     */
+    public void setTimestampRange(TimestampRangePredicate r) {
+      this.timestampRange = r;
+    }
   }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index c2eac1d..06000f8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -56,10 +56,14 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
+import org.apache.accumulo.core.iterators.Filterer;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
 import org.apache.accumulo.core.iterators.system.HeapIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -77,6 +81,7 @@ public class RFile {
   private RFile() {}
   
   private static final int RINDEX_MAGIC = 0x20637474;
+  static final int RINDEX_VER_7 = 7;
   static final int RINDEX_VER_6 = 6;
   static final int RINDEX_VER_4 = 4;
   static final int RINDEX_VER_3 = 3;
@@ -301,6 +306,11 @@ public class RFile {
     private int indexBlockSize;
     private int entries = 0;
     
+    // some aggregate stats to keep on a per-block basis
+    private long minTimestamp = Long.MAX_VALUE;
+    private long maxTimestamp = Long.MIN_VALUE;
+    private ColumnVisibility minimumVisibility = null;
+    
     private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
     private LocalityGroupMetadata currentLocalityGroup = null;
     private int nextBlock = 0;
@@ -337,7 +347,7 @@ public class RFile {
       ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
       
       mba.writeInt(RINDEX_MAGIC);
-      mba.writeInt(RINDEX_VER_6);
+      mba.writeInt(RINDEX_VER_7);
       
       if (currentLocalityGroup != null)
         localityGroups.add(currentLocalityGroup);
@@ -368,8 +378,28 @@ public class RFile {
       }
     }
     
+    private void updateBlockStats(Key key, Value value)
+    {
+      if(minTimestamp > key.getTimestamp())
+        minTimestamp = key.getTimestamp();
+      if(maxTimestamp < key.getTimestamp())
+        maxTimestamp = key.getTimestamp();
+      if(minimumVisibility == null)
+        minimumVisibility = new ColumnVisibility(key.getColumnVisibility());
+      else
+        minimumVisibility = minimumVisibility.or(new ColumnVisibility(key.getColumnVisibility()));
+      entries++;
+    }
+    
+    private void clearBlockStats()
+    {
+      minTimestamp = Long.MAX_VALUE;
+      maxTimestamp = Long.MIN_VALUE;
+      minimumVisibility = null;      
+      entries = 0;
+    }
+    
     public void append(Key key, Value value) throws IOException {
-      
       if (dataClosed) {
         throw new IllegalStateException("Cannont append, data closed");
       }
@@ -395,7 +425,8 @@ public class RFile {
       
       rk.write(blockWriter);
       value.write(blockWriter);
-      entries++;
+      updateBlockStats(key,value);
+      
       
       prevKey = new Key(key);
       lastKeyInBlock = prevKey;
@@ -406,13 +437,13 @@ public class RFile {
       blockWriter.close();
       
       if (lastBlock)
-        currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+        currentLocalityGroup.indexWriter.addLast(key, minTimestamp, maxTimestamp, minimumVisibility, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize(), RINDEX_VER_7);
       else
-        currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+        currentLocalityGroup.indexWriter.add(key, minTimestamp, maxTimestamp, minimumVisibility, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize(), RINDEX_VER_7);
       
+      clearBlockStats();
       blockWriter = null;
       lastKeyInBlock = null;
-      entries = 0;
       nextBlock++;
     }
     
@@ -475,7 +506,7 @@ public class RFile {
     }
   }
   
-  private static class LocalityGroupReader implements FileSKVIterator {
+  private static class LocalityGroupReader implements FileSKVIterator, Filterer<Key,Value> {
     
     private BlockFileReader reader;
     private MultiLevelIndex.Reader index;
@@ -578,7 +609,7 @@ public class RFile {
           return;
         }
       }
-      
+
       prevKey = rk.getKey();
       rk.readFields(currBlock);
       val.readFields(currBlock);
@@ -650,14 +681,15 @@ public class RFile {
       boolean reseek = true;
       
       if (range.afterEndKey(firstKey)) {
-        // range is before first key in rfile, so there is nothing to do
+        // range is before first key in this locality group, so there is nothing to do
         reset();
         reseek = false;
       }
       
-      if (rk != null) {
+      // always reseek if the filter changed since the last seek
+      if (filterChanged == false && rk != null) {
         if (range.beforeStartKey(prevKey) && range.afterEndKey(getTopKey())) {
-          // range is between the two keys in the file where the last range seeked to stopped, so there is
+          // range is between the two keys in the locality group where the last range seeked to stopped, so there is
           // nothing to do
           reseek = false;
         }
@@ -702,12 +734,6 @@ public class RFile {
           // past the last key
         } else {
           
-          // if the index contains the same key multiple times, then go to the
-          // earliest index entry containing the key
-          while (iiter.hasPrevious() && iiter.peekPrevious().getKey().equals(iiter.peek().getKey())) {
-            iiter.previous();
-          }
-          
           if (iiter.hasPrevious())
             prevKey = new Key(iiter.peekPrevious().getKey()); // initially prevKey is the last key of the prev block
           else
@@ -771,9 +797,35 @@ public class RFile {
     public void setInterruptFlag(AtomicBoolean flag) {
       this.interruptFlag = flag;
     }
+    
+    private TimestampRangePredicate timestampRange;
+    private boolean filterChanged = false;
+
+    /* (non-Javadoc)
+     * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
+     */
+    @Override
+    public void applyFilter(Predicate<Key,Value> filter) {
+      // TODO support general filters
+      if(filter instanceof TimestampRangePredicate)
+      {
+        filterChanged = true;
+        TimestampRangePredicate p = (TimestampRangePredicate)filter;
+        // intersect with previous timestampRange
+        if(timestampRange != null)
+          timestampRange = new TimestampRangePredicate(Math.max(p.startTimestamp, timestampRange.startTimestamp), Math.min(p.endTimestamp, timestampRange.endTimestamp));
+        else
+          timestampRange = p;
+        index.setTimestampRange(timestampRange);
+      }
+      else
+      {
+        throw new RuntimeException("yikes, not yet implemented");
+      }
+    }
   }
   
-  public static class Reader extends HeapIterator implements FileSKVIterator {
+  public static class Reader extends HeapIterator implements FileSKVIterator, Filterer<Key,Value> {
     
     private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
     
@@ -799,7 +851,7 @@ public class RFile {
       
       if (magic != RINDEX_MAGIC)
         throw new IOException("Did not see expected magic number, saw " + magic);
-      if (ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
+      if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
         throw new IOException("Did not see expected version, saw " + ver);
       
       int size = mb.readInt();
@@ -947,6 +999,9 @@ public class RFile {
     @Override
     public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
       
+      topKey = null;
+      topValue = null;
+      
       clear();
       
       numLGSeeked = 0;
@@ -1001,6 +1056,8 @@ public class RFile {
         }
         
         if (include) {
+          if(timestampFilter != null)
+            lgr.applyFilter(timestampFilter);
           lgr.seek(range, EMPTY_CF_SET, false);
           addSource(lgr);
           numLGSeeked++;
@@ -1047,6 +1104,94 @@ public class RFile {
         lgr.setInterruptFlag(interruptFlag);
       }
     }
+    
+    ArrayList<Predicate<Key,Value>> filters = new ArrayList<Predicate<Key,Value>>();
+    
+    TimestampRangePredicate timestampFilter = null;
+    
+    Key topKey;
+    Value topValue;
+    
+    /* (non-Javadoc)
+     * @see org.apache.accumulo.core.iterators.system.HeapIterator#hasTop()
+     */
+    @Override
+    public boolean hasTop() {
+      if(topKey == null)
+      {
+        while(super.hasTop())
+        {
+          topKey = super.getTopKey();
+          topValue = super.getTopValue();
+          // check all the filters to see if we found a valid key/value pair
+          boolean keep = true;
+          for(Predicate<Key,Value> filter: filters)
+          {
+            if(!filter.evaluate(topKey, topValue))
+            {
+              keep = false;
+              try {
+                super.next();
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+              break;
+            }
+          }
+          if(keep == true)
+            return true;
+        }
+        // ran out of key/value pairs
+        topKey = null;
+        topValue = null;
+        return false;
+      }
+      else
+      {
+        return true;
+      }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.accumulo.core.iterators.system.HeapIterator#next()
+     */
+    @Override
+    public void next() throws IOException {
+      topKey = null;
+      topValue = null;
+      super.next();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.accumulo.core.iterators.system.HeapIterator#getTopKey()
+     */
+    @Override
+    public Key getTopKey() {
+      if(topKey == null)
+        hasTop();
+      return topKey;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.accumulo.core.iterators.system.HeapIterator#getTopValue()
+     */
+    @Override
+    public Value getTopValue() {
+      if(topValue == null)
+        hasTop();
+      return topValue;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
+     */
+    @Override
+    public void applyFilter(Predicate<Key,Value> filter) {
+      filters.add(filter);
+      // the HeapIterator will pass this filter on to its children, a collection of LocalityGroupReaders
+      if(filter instanceof TimestampRangePredicate)
+        this.timestampFilter = (TimestampRangePredicate)filter;
+    }
   }
   
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java b/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
new file mode 100644
index 0000000..bda3665
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+/**
+ * 
+ */
+public interface Filterer<K,V> {
+  public void applyFilter(Predicate<K,V> filter);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java b/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java
new file mode 100644
index 0000000..99a6e8b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+/**
+ * Predicate<K,V> supports a single method that is used to evaluate an input (K,V) pair as true or false
+ */
+public interface Predicate<K,V> {
+  public boolean evaluate(K k, V v);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java b/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java
new file mode 100644
index 0000000..eb5080b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.predicates;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Predicate;
+
+/**
+ * TimestampRangeFilter is used to determine whether a Key/Value pair falls within a timestamp range
+ */
+public class TimestampRangePredicate implements Predicate<Key,Value> {
+
+  public final long startTimestamp;
+  public final long endTimestamp;
+  
+  
+  /**
+   * @param startTimestamp - inclusive first allowable timestamp
+   * @param endTimestamp - inclusive last allowable timestamp
+   */
+  public TimestampRangePredicate(long startTimestamp, long endTimestamp) {
+    super();
+    this.startTimestamp = startTimestamp;
+    this.endTimestamp = endTimestamp;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.core.iterators.Predicate#evaluate(java.lang.Object, java.lang.Object)
+   */
+  /**
+   * return true IFF the key falls within the timestamp range
+   */
+  @Override
+  public boolean evaluate(Key k, Value v) {
+    long timestamp = k.getTimestamp();
+    return timestamp >= startTimestamp && timestamp <= endTimestamp;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
index e54f37c..72aa3e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
@@ -17,9 +17,12 @@
 package org.apache.accumulo.core.iterators.system;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
+import org.apache.accumulo.core.iterators.Predicate;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.commons.collections.buffer.PriorityBuffer;
 
@@ -55,28 +58,29 @@ public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value>
   }
   
   @Override
-  final public Key getTopKey() {
+  public Key getTopKey() {
     return currentIter.getTopKey();
   }
   
   @Override
-  final public Value getTopValue() {
+  public Value getTopValue() {
     return currentIter.getTopValue();
   }
   
   @Override
-  final public boolean hasTop() {
+  public boolean hasTop() {
     return heap.size() > 0;
   }
   
   @Override
-  final public void next() throws IOException {
+  public void next() throws IOException {
     switch (heap.size()) {
       case 0:
         throw new IllegalStateException("Called next() when there is no top");
       case 1:
         // optimization for case when heap contains one entry,
         // avoids remove and add
+        // TODO apply the filters
         currentIter.next();
         if (!currentIter.hasTop()) {
           heap.remove();
@@ -85,6 +89,7 @@ public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value>
         break;
       default:
         Index idx = (Index) heap.remove();
+        // TODO apply the filters
         idx.iter.next();
         if (idx.iter.hasTop()) {
           heap.add(idx);
@@ -111,5 +116,4 @@ public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value>
     else
       currentIter = null;
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
index a41f7be..a4391c0 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
@@ -23,26 +23,22 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.VisibilityEvaluator;
-import org.apache.accumulo.core.security.VisibilityParseException;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
 public class VisibilityFilter extends Filter {
-  private VisibilityEvaluator ve;
+  private Authorizations auths;
   private Text defaultVisibility;
   private LRUMap cache;
   private Text tmpVis;
   
   private static final Logger log = Logger.getLogger(VisibilityFilter.class);
   
-  public VisibilityFilter() {}
-  
   public VisibilityFilter(SortedKeyValueIterator<Key,Value> iterator, Authorizations authorizations, byte[] defaultVisibility) {
     setSource(iterator);
-    this.ve = new VisibilityEvaluator(authorizations);
+    this.auths = authorizations;
     this.defaultVisibility = new Text(defaultVisibility);
     this.cache = new LRUMap(1000);
     this.tmpVis = new Text();
@@ -50,7 +46,7 @@ public class VisibilityFilter extends Filter {
   
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    return new VisibilityFilter(getSource().deepCopy(env), ve.getAuthorizations(), TextUtil.getBytes(defaultVisibility));
+    return new VisibilityFilter(getSource().deepCopy(env), auths, TextUtil.getBytes(defaultVisibility));
   }
   
   @Override
@@ -66,13 +62,8 @@ public class VisibilityFilter extends Filter {
     if (b != null)
       return b;
     
-    try {
-      Boolean bb = ve.evaluate(new ColumnVisibility(testVis));
-      cache.put(new Text(testVis), bb);
-      return bb;
-    } catch (VisibilityParseException e) {
-      log.error("Parse Error", e);
-      return false;
-    }
+    Boolean bb = new ColumnVisibility(testVis).evaluate(auths);
+    cache.put(new Text(testVis), bb);
+    return bb;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java b/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
index 5a825f2..1b72b33 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
@@ -16,16 +16,16 @@
  */
 package org.apache.accumulo.core.security;
 
-import java.util.ArrayList;
+import java.io.ByteArrayOutputStream;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
+import java.util.Iterator;
+import java.util.TreeSet;
 
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.util.BadArgumentException;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparator;
 
 /**
  * Validate the column visibility is a valid expression and set the visibility for a Mutation. See {@link ColumnVisibility#ColumnVisibility(byte[])} for the
@@ -33,117 +33,190 @@ import org.apache.hadoop.io.WritableComparator;
  */
 public class ColumnVisibility {
   
-  Node node = null;
-  private byte[] expression;
-  
-  /**
-   * Accessor for the underlying byte string.
-   * 
-   * @return byte array representation of a visibility expression
-   */
-  public byte[] getExpression() {
-    return expression;
-  }
+  private Node node = null;
   
   public static enum NodeType {
     TERM, OR, AND,
   }
-  
-  public static class Node {
-    public final static List<Node> EMPTY = Collections.emptyList();
-    NodeType type;
-    int start = 0;
-    int end = 0;
-    List<Node> children = EMPTY;
+
+  private static abstract class Node implements Comparable<Node> {
+    protected final NodeType type;
     
-    public Node(NodeType type) {
+    public Node(NodeType type)
+    {
       this.type = type;
     }
-    
-    public Node(int start, int end) {
-      this.type = NodeType.TERM;
-      this.start = start;
-      this.end = end;
+
+    public byte[] generate() {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      generate(baos,false);
+      return baos.toByteArray();
     }
     
-    public void add(Node child) {
-      if (children == EMPTY)
-        children = new ArrayList<Node>();
-      
-      children.add(child);
-    }
+    public abstract boolean evaluate(Authorizations auths);
+    
+    protected abstract void generate(ByteArrayOutputStream baos, boolean parens);
+  }
+  
+  private static class TermNode extends Node {
+    
+    final ByteSequence bs;
     
-    public NodeType getType() {
-      return type;
+    public TermNode(final ByteSequence bs) {
+      super(NodeType.TERM);
+      this.bs = bs;
     }
     
-    public List<Node> getChildren() {
-      return children;
+    public boolean evaluate(Authorizations auths)
+    {
+      return auths.contains(bs);
+    }
+
+
+    protected void generate(ByteArrayOutputStream baos, boolean parens)
+    {
+      baos.write(bs.getBackingArray(), bs.offset(), bs.length());
     }
     
-    public int getTermStart() {
-      return start;
+    @Override
+    public boolean equals(Object other) {
+      if(other instanceof TermNode)
+      {
+        return bs.compareTo(((TermNode)other).bs) == 0;
+      }
+      return false;
     }
     
-    public int getTermEnd() {
-      return end;
+    @Override
+    public int compareTo(Node o) {
+      if(o.type == NodeType.TERM)
+      {
+        return bs.compareTo(((TermNode)o).bs);
+      }
+      return type.ordinal() - o.type.ordinal();
     }
   }
   
-  public static class NodeComparator implements Comparator<Node> {
+  private abstract static class AggregateNode extends Node {
+
+    /**
+     * @param type
+     */
+    public AggregateNode(NodeType type) {
+      super(type);
+    }
     
-    byte[] text;
+    protected TreeSet<Node> children = new TreeSet<Node>();
     
-    NodeComparator(byte[] text) {
-      this.text = text;
+    protected abstract byte getOperator();
+    
+    @Override
+    protected void generate(ByteArrayOutputStream baos, boolean parens) {
+      if(parens)
+        baos.write('(');
+      boolean first = true;
+      for(Node child:children)
+      {
+        if(!first)
+          baos.write(getOperator());
+        child.generate(baos, true);
+        first = false;
+      }
+      if(parens)
+        baos.write(')');
     }
     
     @Override
-    public int compare(Node a, Node b) {
-      int diff = a.type.ordinal() - b.type.ordinal();
-      if (diff != 0)
-        return diff;
-      switch (a.type) {
-        case TERM:
-          return WritableComparator.compareBytes(text, a.start, a.end - a.start, text, b.start, b.end - b.start);
-        case OR:
-        case AND:
-          diff = a.children.size() - b.children.size();
-          if (diff != 0)
-            return diff;
-          for (int i = 0; i < a.children.size(); i++) {
-            diff = compare(a.children.get(i), b.children.get(i));
-            if (diff != 0)
-              return diff;
-          }
+    public int compareTo(Node o) {
+      int ordinalDiff = type.ordinal() - o.type.ordinal();
+      if(ordinalDiff != 0)
+        return ordinalDiff;
+      AggregateNode other = (AggregateNode)o;
+      int childCountDifference = children.size() - other.children.size();
+      if(childCountDifference != 0)
+        return childCountDifference;
+      Iterator<Node> otherChildren = other.children.iterator();
+      for(Node n1:children)
+      {
+        int comp = n1.compareTo(otherChildren.next());
+        if(comp != 0)
+          return comp;
       }
       return 0;
     }
+
   }
   
-  static private void flatten(Node root, byte[] expression, StringBuilder out) {
-    if (root.type == NodeType.TERM)
-      out.append(new String(expression, root.start, root.end - root.start));
-    else {
-      String sep = "";
-      Collections.sort(root.children, new NodeComparator(expression));
-      for (Node c : root.children) {
-        out.append(sep);
-        boolean parens = (c.type != NodeType.TERM && root.type != c.type);
-        if (parens)
-          out.append("(");
-        flatten(c, expression, out);
-        if (parens)
-          out.append(")");
-        sep = root.type == NodeType.AND ? "&" : "|";
-      }
+  private static class OrNode extends AggregateNode {
+
+    public OrNode() {
+      super(NodeType.OR);
+    }
+
+    @Override
+    public boolean evaluate(Authorizations auths) {
+      for(Node child:children)
+        if(child.evaluate(auths))
+          return true;
+      return false;
+    }
+
+    @Override
+    protected byte getOperator() {
+      return '|';
     }
+    
+  }
+  
+  private static class AndNode extends AggregateNode {
+
+    public AndNode()
+    {
+      super(NodeType.AND);
+    }
+    
+    @Override
+    public boolean evaluate(Authorizations auths) {
+      for(Node child:children)
+        if(!child.evaluate(auths))
+          return false;
+      return true;
+    }
+
+    @Override
+    protected byte getOperator() {
+      return '&';
+    }
+    
   }
+
+  private byte[] expression = null;
   
+  /**
+   * @deprecated
+   * @see org.apache.accumulo.security.ColumnVisibility#getExpression()
+   */
   public byte[] flatten() {
-    StringBuilder builder = new StringBuilder();
-    flatten(node, expression, builder);
-    return builder.toString().getBytes();
+    return getExpression();
+  }
+  
+  /**
+   * Generate the byte[] that represents this ColumnVisibility.
+   * @return a byte[] representation of this visibility
+   */
+  public byte[] getExpression(){
+    if(expression != null)
+      return expression;
+    expression = _flatten();
+    return expression;
+  }
+  
+  private static final byte[] emptyExpression = new byte[0];
+  
+  private byte[] _flatten() {
+    if(node == null)
+      return emptyExpression;
+    return node.generate();
   }
   
   private static class ColumnVisibilityParser {
@@ -170,7 +243,7 @@ public class ColumnVisibility {
       if (start != end) {
         if (expr != null)
           throw new BadArgumentException("expression needs | or &", new String(expression), start);
-        return new Node(start, end);
+        return new TermNode(new ArrayByteSequence(expression, start, end - start));
       }
       if (expr == null)
         throw new BadArgumentException("empty term", new String(expression), start);
@@ -189,9 +262,9 @@ public class ColumnVisibility {
               if (!result.type.equals(NodeType.AND))
                 throw new BadArgumentException("cannot mix & and |", new String(expression), index - 1);
             } else {
-              result = new Node(NodeType.AND);
+              result = new AndNode();
             }
-            result.add(expr);
+            ((AggregateNode)result).children.add(expr);
             expr = null;
             termStart = index;
             break;
@@ -202,9 +275,9 @@ public class ColumnVisibility {
               if (!result.type.equals(NodeType.OR))
                 throw new BadArgumentException("cannot mix | and &", new String(expression), index - 1);
             } else {
-              result = new Node(NodeType.OR);
+              result = new OrNode();
             }
-            result.add(expr);
+            ((AggregateNode)result).children.add(expr);
             expr = null;
             termStart = index;
             break;
@@ -225,11 +298,21 @@ public class ColumnVisibility {
             if (result == null)
               return child;
             if (result.type == child.type)
-              for (Node c : child.children)
-                result.add(c);
+            {
+              AggregateNode parenNode = (AggregateNode)child;
+              for (Node c : parenNode.children)
+                ((AggregateNode)result).children.add(c);
+            }
             else
-              result.add(child);
-            result.end = index - 1;
+              ((AggregateNode)result).children.add(child);
+            if (result.type != NodeType.TERM)
+            {
+              AggregateNode resultNode = (AggregateNode)result;
+              if (resultNode.children.size() == 1)
+                return resultNode.children.first();
+              if (resultNode.children.size() < 2)
+                throw new BadArgumentException("missing term", new String(expression), index);
+            }
             return result;
           }
           default: {
@@ -241,12 +324,24 @@ public class ColumnVisibility {
       }
       Node child = processTerm(termStart, index, expr, expression);
       if (result != null)
-        result.add(child);
+      {
+        if(result.type == child.type)
+        {
+          ((AggregateNode)result).children.addAll(((AggregateNode)child).children);
+        }
+        else
+          ((AggregateNode)result).children.add(child);
+      }
       else
         result = child;
       if (result.type != NodeType.TERM)
-        if (result.children.size() < 2)
+      {
+        AggregateNode resultNode = (AggregateNode)result;
+        if (resultNode.children.size() == 1)
+          return resultNode.children.first();
+        if (resultNode.children.size() < 2)
           throw new BadArgumentException("missing term", new String(expression), index);
+      }
       return result;
     }
   }
@@ -256,14 +351,12 @@ public class ColumnVisibility {
       ColumnVisibilityParser p = new ColumnVisibilityParser();
       node = p.parse(expression);
     }
-    this.expression = expression;
   }
   
   /**
    * Empty visibility. Normally, elements with empty visibility can be seen by everyone. Though, one could change this behavior with filters.
    */
   public ColumnVisibility() {
-    expression = new byte[0];
   }
   
   /**
@@ -279,6 +372,10 @@ public class ColumnVisibility {
     this(TextUtil.getBytes(expression));
   }
   
+  private ColumnVisibility(Node node) {
+    this.node = node;
+  }
+  
   /**
    * Set the column visibility for a Mutation.
    * 
@@ -313,7 +410,7 @@ public class ColumnVisibility {
   
   @Override
   public String toString() {
-    return "[" + new String(expression) + "]";
+    return "[" + new String(this.getExpression()) + "]";
   }
   
   /**
@@ -329,16 +426,55 @@ public class ColumnVisibility {
   /**
    * Compares two ColumnVisibilities for string equivalence, not as a meaningful comparison of terms and conditions.
    */
-  public boolean equals(ColumnVisibility otherLe) {
-    return Arrays.equals(expression, otherLe.expression);
-  }
+//  public boolean equals(ColumnVisibility otherLe) {
+//    return Arrays.equals(expression, otherLe.expression);
+//  }
   
   @Override
   public int hashCode() {
-    return Arrays.hashCode(expression);
+    return Arrays.hashCode(getExpression());
   }
   
-  public Node getParseTree() {
-    return node;
+  public boolean evaluate(Authorizations auths) {
+    if(node == null)
+      return true;
+    return node.evaluate(auths);
   }
+  
+  public ColumnVisibility or(ColumnVisibility other)
+  {
+    if(node == null)
+      return this;
+    if(other.node == null)
+      return other;
+    OrNode orNode = new OrNode();
+    if(other.node instanceof OrNode)
+      orNode.children.addAll(((OrNode)other.node).children);
+    else
+      orNode.children.add(other.node);
+    if(node instanceof OrNode)
+      orNode.children.addAll(((OrNode)node).children);
+    else
+      orNode.children.add(node);
+    return new ColumnVisibility(orNode);
+  }
+  
+  public ColumnVisibility and(ColumnVisibility other)
+  {
+    if(node == null)
+      return other;
+    if(other.node == null)
+      return this;
+    AndNode andNode = new AndNode();
+    if(other.node instanceof AndNode)
+      andNode.children.addAll(((AndNode)other.node).children);
+    else
+      andNode.children.add(other.node);
+    if(node instanceof AndNode)
+      andNode.children.addAll(((AndNode)node).children);
+    else
+      andNode.children.add(node);
+    return new ColumnVisibility(andNode);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java b/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
index c8b33ba..1df543f 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
@@ -47,7 +47,7 @@ public class VisibilityConstraint implements Constraint {
     if (updates.size() > 1)
       ok = new HashSet<String>();
     
-    VisibilityEvaluator ve = null;
+    Authorizations auths = env.getAuthorizations();
     
     for (ColumnUpdate update : updates) {
       
@@ -59,16 +59,11 @@ public class VisibilityConstraint implements Constraint {
         
         try {
           
-          if (ve == null)
-            ve = new VisibilityEvaluator(env.getAuthorizations());
-          
-          if (!ve.evaluate(new ColumnVisibility(cv)))
+          if (!new ColumnVisibility(cv).evaluate(auths))
             return Collections.singletonList(new Short((short) 2));
           
         } catch (BadArgumentException bae) {
           return Collections.singletonList(new Short((short) 1));
-        } catch (VisibilityParseException e) {
-          return Collections.singletonList(new Short((short) 1));
         }
         
         if (ok != null)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index c5e2501..3da616d 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -58,9 +59,9 @@ public class MultiLevelIndexTest extends TestCase {
     BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
     
     for (int i = 0; i < num; i++)
-      mliw.add(new Key(String.format("%05d000", i)), i, 0, 0, 0);
+      mliw.add(new Key(String.format("%05d000", i)), 0l, 0l, new ColumnVisibility(), i, 0, 0, 0, RFile.RINDEX_VER_7);
     
-    mliw.addLast(new Key(String.format("%05d000", num)), num, 0, 0, 0);
+    mliw.addLast(new Key(String.format("%05d000", num)), 0l, 0l, new ColumnVisibility(), num, 0, 0, 0, RFile.RINDEX_VER_7);
     
     ABlockWriter root = _cbw.prepareMetaBlock("root");
     mliw.close(root);
@@ -75,7 +76,7 @@ public class MultiLevelIndexTest extends TestCase {
     FSDataInputStream in = new FSDataInputStream(bais);
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance());
     
-    Reader reader = new Reader(_cbr, RFile.RINDEX_VER_6);
+    Reader reader = new Reader(_cbr, RFile.RINDEX_VER_7);
     BlockRead rootIn = _cbr.getMetaBlock("root");
     reader.readFields(rootIn);
     rootIn.close();
@@ -90,15 +91,6 @@ public class MultiLevelIndexTest extends TestCase {
     
     assertEquals(num + 1, count);
     
-    while (liter.hasPrevious()) {
-      count--;
-      assertEquals(count, liter.previousIndex());
-      assertEquals(count, liter.peekPrevious().getNumEntries());
-      assertEquals(count, liter.previous().getNumEntries());
-    }
-    
-    assertEquals(0, count);
-    
     // go past the end
     liter = reader.lookup(new Key(String.format("%05d000", num + 1)));
     assertFalse(liter.hasNext());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index ed7cf7b..71f5c6c 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -24,8 +24,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Random;
 import java.util.Set;
 
 import junit.framework.TestCase;
@@ -51,8 +53,11 @@ import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.junit.Test;
 
-public class RFileTest extends TestCase {
+import static org.junit.Assert.*;
+
+public class RFileTest {
   
   private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
   
@@ -206,6 +211,7 @@ public class RFileTest extends TestCase {
     return String.format(prefix + "%06d", i);
   }
   
+  @Test
   public void test1() throws IOException {
     
     // test an emprt file
@@ -224,6 +230,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void test2() throws IOException {
     
     // test an rfile with one entry
@@ -260,6 +267,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void test3() throws IOException {
     
     // test an rfile with multiple rows having multiple columns
@@ -403,6 +411,7 @@ public class RFileTest extends TestCase {
     assertFalse(evi.hasNext());
   }
   
+  @Test
   public void test4() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -445,6 +454,7 @@ public class RFileTest extends TestCase {
     }
   }
   
+  @Test
   public void test5() throws IOException {
     
     TestRFile trf = new TestRFile();
@@ -473,6 +483,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void test6() throws IOException {
     
     TestRFile trf = new TestRFile();
@@ -505,6 +516,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void test7() throws IOException {
     // these test excercise setting the end key of a range
     
@@ -556,6 +568,7 @@ public class RFileTest extends TestCase {
     trf.reader.close();
   }
   
+  @Test
   public void test8() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -672,6 +685,7 @@ public class RFileTest extends TestCase {
     return cfs;
   }
   
+  @Test
   public void test9() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -813,6 +827,7 @@ public class RFileTest extends TestCase {
     
   }
   
+  @Test
   public void test10() throws IOException {
     
     // test empty locality groups
@@ -941,6 +956,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void test11() throws IOException {
     // test locality groups with more than two entries
     
@@ -1045,6 +1061,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void test12() throws IOException {
     // test inserting column fams not in locality groups
     
@@ -1076,6 +1093,7 @@ public class RFileTest extends TestCase {
     
   }
   
+  @Test
   public void test13() throws IOException {
     // test inserting column fam in default loc group that was in
     // previous locality group
@@ -1117,6 +1135,7 @@ public class RFileTest extends TestCase {
     
   }
   
+  @Test
   public void test14() throws IOException {
     // test starting locality group after default locality group was started
     
@@ -1142,6 +1161,7 @@ public class RFileTest extends TestCase {
     trf.writer.close();
   }
   
+  @Test
   public void test16() throws IOException {
     TestRFile trf = new TestRFile();
     
@@ -1160,6 +1180,7 @@ public class RFileTest extends TestCase {
     trf.closeWriter();
   }
   
+  @Test
   public void test17() throws IOException {
     // add alot of the same keys to rfile that cover multiple blocks...
     // this should cause the keys in the index to be exactly the same...
@@ -1298,6 +1319,7 @@ public class RFileTest extends TestCase {
     assertEquals(nonExcluded, colFamsSeen);
   }
   
+  @Test
   public void test18() throws IOException {
     // test writing more column families to default LG than it will track
     
@@ -1349,6 +1371,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void test19() throws IOException {
     // test RFile metastore
     TestRFile trf = new TestRFile();
@@ -1401,6 +1424,7 @@ public class RFileTest extends TestCase {
     trf.closeReader();
   }
   
+  @Test
   public void testOldVersions() throws Exception {
     runVersionTest(3);
     runVersionTest(4);
@@ -1459,4 +1483,23 @@ public class RFileTest extends TestCase {
     
     reader.close();
   }
+  
+  @Test
+  public void testSingleKeyBlocks() throws IOException
+  {
+    byte[] bytes = new byte[2000];
+    Random r = new Random();
+    r.nextBytes(bytes);
+    TestRFile trf = new TestRFile();
+    trf.openWriter(false);
+    Value vBig = new Value(bytes);
+    trf.writer.startNewLocalityGroup("one", Collections.singleton((ByteSequence)(new ArrayByteSequence("one"))));
+    trf.writer.append(new Key("r1","one"), vBig);
+    trf.writer.append(new Key("r2","one"), vBig);
+    trf.writer.startNewLocalityGroup("two", Collections.singleton((ByteSequence)(new ArrayByteSequence("two"))));
+    trf.writer.append(new Key("r1","two"), vBig);
+    trf.writer.append(new Key("r2","two"), vBig);
+    trf.writer.close();
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
new file mode 100644
index 0000000..c58f924
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.iterators.Predicate;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+
+public class TimestampFilterTest {
+  
+  @Test
+  public void testRFileTimestampFiltering() throws Exception {
+    // TODO create an RFile with increasing timestamp and random key order
+    Predicate<Key,Value> timeRange = new TimestampRangePredicate(100, 110);
+    int expected = 0;
+    Random r = new Random();
+    Configuration conf = new Configuration();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf);
+    RFile.Writer writer = new RFile.Writer(_cbw, 1000, 1000);
+    writer.startDefaultLocalityGroup();
+    byte [] row = new byte[10];
+    byte [] colFam = new byte[10];
+    byte [] colQual = new byte[10];
+    Value value = new Value(new byte[0]);
+    byte [] colVis = new byte[0];
+    TreeMap<Key,Value> inputBuffer = new TreeMap<Key,Value>();
+    for(int i = 0; i < 100000; i++)
+    {
+      r.nextBytes(row);
+      r.nextBytes(colFam);
+      r.nextBytes(colQual);
+      Key k = new Key(row,colFam,colQual,colVis,(long)i);
+      if(timeRange.evaluate(k, value))
+        expected++;
+      inputBuffer.put(k, value);
+    }
+    for(Entry<Key,Value> e:inputBuffer.entrySet())
+    {
+      writer.append(e.getKey(), e.getValue());
+    }
+    writer.close();
+
+    // scan the RFile to bring back keys in a given timestamp range
+    byte[] data = baos.toByteArray();
+    ByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
+    FSDataInputStream in = new FSDataInputStream(bais);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
+    RFile.Reader reader = new RFile.Reader(_cbr);
+    int count = 0;
+    reader.applyFilter(timeRange);
+    reader.seek(new Range(), Collections.EMPTY_SET, false);
+    while(reader.hasTop())
+    {
+      count++;
+      assertTrue(timeRange.evaluate(reader.getTopKey(),reader.getTopValue()));
+      reader.next();
+    }
+    assertEquals(expected, count);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
index 5508a4d..4681a61 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
@@ -24,8 +24,6 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.TreeMap;
 
-import junit.framework.TestCase;
-
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -40,8 +38,12 @@ import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
 
-public class IndexedDocIteratorTest extends TestCase {
+public class IndexedDocIteratorTest {
   
   private static final Logger log = Logger.getLogger(IndexedDocIteratorTest.class);
   
@@ -171,7 +173,7 @@ public class IndexedDocIteratorTest extends TestCase {
   
   public void testNull() {}
   
-  @Override
+  @Before
   public void setUp() {
     Logger.getRootLogger().setLevel(Level.ERROR);
   }
@@ -179,6 +181,7 @@ public class IndexedDocIteratorTest extends TestCase {
   private static final int NUM_ROWS = 5;
   private static final int NUM_DOCIDS = 200;
   
+  @Test
   public void test1() throws IOException {
     columnFamilies = new Text[2];
     columnFamilies[0] = new Text("CC");
@@ -216,6 +219,7 @@ public class IndexedDocIteratorTest extends TestCase {
     cleanup();
   }
   
+  @Test
   public void test2() throws IOException {
     columnFamilies = new Text[3];
     columnFamilies[0] = new Text("A");
@@ -250,6 +254,7 @@ public class IndexedDocIteratorTest extends TestCase {
     cleanup();
   }
   
+  @Test
   public void test3() throws IOException {
     columnFamilies = new Text[6];
     columnFamilies[0] = new Text("C");
@@ -292,6 +297,7 @@ public class IndexedDocIteratorTest extends TestCase {
     cleanup();
   }
   
+  @Test
   public void test4() throws IOException {
     columnFamilies = new Text[3];
     boolean[] notFlags = new boolean[3];

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java b/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
index df1863a..d463f42 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
@@ -16,8 +16,7 @@
  */
 package org.apache.accumulo.core.security;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import org.junit.Test;
 
@@ -64,13 +63,6 @@ public class ColumnVisibilityTest {
     shouldThrow("a*b");
   }
   
-  public void normalized(String... values) {
-    for (int i = 0; i < values.length; i += 2) {
-      ColumnVisibility cv = new ColumnVisibility(values[i].getBytes());
-      assertArrayEquals(cv.flatten(), values[i + 1].getBytes());
-    }
-  }
-  
   @Test
   public void testComplexCompound() {
     shouldNotThrow("(a|b)&(x|y)");
@@ -79,11 +71,61 @@ public class ColumnVisibilityTest {
     shouldNotThrow("(one&two)|(foo&bar)", "(one|foo)&three", "one|foo|bar", "(one|foo)|bar", "((one|foo)|bar)&two");
   }
   
+  public void normalized(String... values) {
+    for (int i = 0; i < values.length; i += 2) {
+      ColumnVisibility cv = new ColumnVisibility(values[i].getBytes());
+      assertArrayEquals(cv.getExpression(), values[i + 1].getBytes());
+    }
+  }
+  
   @Test
   public void testNormalization() {
     normalized("a", "a", "(a)", "a", "b|a", "a|b", "(b)|a", "a|b", "(b|(a|c))&x", "x&(a|b|c)", "(((a)))", "a");
+    normalized("a|a", "a", "a|(a&a)", "a", "(a&b)|(b&a)", "a&b");
+    normalized("a|(a|(a|b))","a|b");
+    normalized("a|(a|(a|a))","a");
+  }
+  
+  public void aOrBEqualC(String a, String b, String c)
+  {
+    ColumnVisibility cvA = new ColumnVisibility(a.getBytes());
+    ColumnVisibility cvB = new ColumnVisibility(b.getBytes());
+    ColumnVisibility cvC = cvA.or(cvB);
+    assertArrayEquals(cvC.getExpression(), c.getBytes());
+    // check that we didn't disturb the original ColumnVisibilities
+    assertArrayEquals(cvA.getExpression(), a.getBytes());
+    assertArrayEquals(cvB.getExpression(), b.getBytes());
+  }
+  
+  @Test
+  public void testDisjunction() {
+    aOrBEqualC("a", "b", "a|b");
+    aOrBEqualC("c|(a&b)", "b", "b|c|(a&b)");
+    aOrBEqualC("c|(a&b)", "a|c","a|c|(a&b)");
+    aOrBEqualC("a&b","c&d","(a&b)|(c&d)");
+    aOrBEqualC("a","","");
   }
   
+  public void aAndBEqualC(String a, String b, String c)
+  {
+    ColumnVisibility cvA = new ColumnVisibility(a.getBytes());
+    ColumnVisibility cvB = new ColumnVisibility(b.getBytes());
+    ColumnVisibility cvC = cvA.and(cvB);
+    assertArrayEquals(cvC.getExpression(), c.getBytes());
+    // check that we didn't disturb the original ColumnVisibilities
+    assertArrayEquals(cvA.getExpression(), a.getBytes());
+    assertArrayEquals(cvB.getExpression(), b.getBytes());
+  }
+  
+  @Test
+  public void testConjunction() {
+    aAndBEqualC("a", "b", "a&b");
+    aAndBEqualC("a&b", "c", "a&b&c");
+    aAndBEqualC("a&(b|(c&d))", "e&(b|(c&d))","a&e&(b|(c&d))");
+    aAndBEqualC("a|b","c|d","(a|b)&(c|d)");
+    aAndBEqualC("a","","a");
+  }
+
   @Test
   public void testDanglingOperators() {
     shouldThrow("a|b&");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java b/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
index b5c2455..7612e15 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
@@ -26,33 +26,33 @@ import org.junit.Test;
 public class VisibilityEvaluatorTest {
   
   @Test
-  public void testVisibilityEvaluator() throws VisibilityParseException {
-    VisibilityEvaluator ct = new VisibilityEvaluator(ByteArraySet.fromStrings("one", "two", "three", "four"));
+  public void testVisibilityEvaluator() {
+    Authorizations auths = new Authorizations(ByteArraySet.fromStrings("one", "two", "three", "four"));
     
     // test for and
-    assertTrue("'and' test", ct.evaluate(new ColumnVisibility("one&two")));
+    assertTrue("'and' test", new ColumnVisibility("one&two").evaluate(auths));
     
     // test for or
-    assertTrue("'or' test", ct.evaluate(new ColumnVisibility("foor|four")));
+    assertTrue("'or' test", new ColumnVisibility("foor|four").evaluate(auths));
     
     // test for and and or
-    assertTrue("'and' and 'or' test", ct.evaluate(new ColumnVisibility("(one&two)|(foo&bar)")));
+    assertTrue("'and' and 'or' test", new ColumnVisibility("(one&two)|(foo&bar)").evaluate(auths));
     
     // test for false negatives
     for (String marking : new String[] {"one", "one|five", "five|one", "(one)", "(one&two)|(foo&bar)", "(one|foo)&three", "one|foo|bar", "(one|foo)|bar",
         "((one|foo)|bar)&two"}) {
-      assertTrue(marking, ct.evaluate(new ColumnVisibility(marking)));
+      assertTrue(marking, new ColumnVisibility(marking).evaluate(auths));
     }
     
     // test for false positives
     for (String marking : new String[] {"five", "one&five", "five&one", "((one|foo)|bar)&goober"}) {
-      assertFalse(marking, ct.evaluate(new ColumnVisibility(marking)));
+      assertFalse(marking, new ColumnVisibility(marking).evaluate(auths));
     }
     
     // test missing separators; these should throw an exception
     for (String marking : new String[] {"one(five)", "(five)one", "(one)(two)", "a|(b(c))"}) {
       try {
-        ct.evaluate(new ColumnVisibility(marking));
+        new ColumnVisibility(marking).evaluate(auths);
         fail(marking + " failed to throw");
       } catch (Throwable e) {
         // all is good
@@ -62,7 +62,7 @@ public class VisibilityEvaluatorTest {
     // test unexpected separator
     for (String marking : new String[] {"&(five)", "|(five)", "(five)&", "five|", "a|(b)&", "(&five)", "(five|)"}) {
       try {
-        ct.evaluate(new ColumnVisibility(marking));
+        new ColumnVisibility(marking).evaluate(auths);
         fail(marking + " failed to throw");
       } catch (Throwable e) {
         // all is good
@@ -72,7 +72,7 @@ public class VisibilityEvaluatorTest {
     // test mismatched parentheses
     for (String marking : new String[] {"(", ")", "(a&b", "b|a)"}) {
       try {
-        ct.evaluate(new ColumnVisibility(marking));
+        new ColumnVisibility(marking).evaluate(auths);
         fail(marking + " failed to throw");
       } catch (Throwable e) {
         // all is good