You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:57:57 UTC

[26/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
index 9b9cd51,0000000..4b3e41c
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
@@@ -1,259 -1,0 +1,258 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * 
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.accumulo.core.file.rfile.bcfile;
 +
 +import java.io.IOException;
 +import java.io.PrintStream;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.LinkedHashMap;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.file.rfile.bcfile.BCFile.BlockRegion;
 +import org.apache.accumulo.core.file.rfile.bcfile.BCFile.MetaIndexEntry;
 +import org.apache.accumulo.core.file.rfile.bcfile.TFile.TFileIndexEntry;
 +import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.IOUtils;
 +
 +/**
 + * Dumping the information of a TFile.
 + */
 +class TFileDumper {
 +  static final Log LOG = LogFactory.getLog(TFileDumper.class);
 +  
 +  private TFileDumper() {
 +    // namespace object not constructable.
 +  }
 +  
 +  private enum Align {
 +    LEFT, CENTER, RIGHT, ZERO_PADDED;
 +    static String format(String s, int width, Align align) {
 +      if (s.length() >= width)
 +        return s;
 +      int room = width - s.length();
 +      Align alignAdjusted = align;
 +      if (room == 1) {
 +        alignAdjusted = LEFT;
 +      }
 +      if (alignAdjusted == LEFT) {
 +        return s + String.format("%" + room + "s", "");
 +      }
 +      if (alignAdjusted == RIGHT) {
 +        return String.format("%" + room + "s", "") + s;
 +      }
 +      if (alignAdjusted == CENTER) {
 +        int half = room / 2;
 +        return String.format("%" + half + "s", "") + s + String.format("%" + (room - half) + "s", "");
 +      }
 +      throw new IllegalArgumentException("Unsupported alignment");
 +    }
 +    
 +    static String format(long l, int width, Align align) {
 +      if (align == ZERO_PADDED) {
 +        return String.format("%0" + width + "d", l);
 +      }
 +      return format(Long.toString(l), width, align);
 +    }
 +    
 +    static int calculateWidth(String caption, long max) {
 +      return Math.max(caption.length(), Long.toString(max).length());
 +    }
 +  }
 +  
 +  /**
 +   * Dump information about TFile.
 +   * 
 +   * @param file
 +   *          Path string of the TFile
 +   * @param out
 +   *          PrintStream to output the information.
 +   * @param conf
 +   *          The configuration object.
-    * @throws IOException
 +   */
 +  static public void dumpInfo(String file, PrintStream out, Configuration conf) throws IOException {
 +    final int maxKeySampleLen = 16;
 +    Path path = new Path(file);
 +    FileSystem fs = path.getFileSystem(conf);
 +    long length = fs.getFileStatus(path).getLen();
 +    FSDataInputStream fsdis = fs.open(path);
 +    TFile.Reader reader = new TFile.Reader(fsdis, length, conf);
 +    try {
 +      LinkedHashMap<String,String> properties = new LinkedHashMap<String,String>();
 +      int blockCnt = reader.readerBCF.getBlockCount();
 +      int metaBlkCnt = reader.readerBCF.metaIndex.index.size();
 +      properties.put("BCFile Version", reader.readerBCF.version.toString());
 +      properties.put("TFile Version", reader.tfileMeta.version.toString());
 +      properties.put("File Length", Long.toString(length));
 +      properties.put("Data Compression", reader.readerBCF.getDefaultCompressionName());
 +      properties.put("Record Count", Long.toString(reader.getEntryCount()));
 +      properties.put("Sorted", Boolean.toString(reader.isSorted()));
 +      if (reader.isSorted()) {
 +        properties.put("Comparator", reader.getComparatorName());
 +      }
 +      properties.put("Data Block Count", Integer.toString(blockCnt));
 +      long dataSize = 0, dataSizeUncompressed = 0;
 +      if (blockCnt > 0) {
 +        for (int i = 0; i < blockCnt; ++i) {
 +          BlockRegion region = reader.readerBCF.dataIndex.getBlockRegionList().get(i);
 +          dataSize += region.getCompressedSize();
 +          dataSizeUncompressed += region.getRawSize();
 +        }
 +        properties.put("Data Block Bytes", Long.toString(dataSize));
 +        if (reader.readerBCF.getDefaultCompressionName() != "none") {
 +          properties.put("Data Block Uncompressed Bytes", Long.toString(dataSizeUncompressed));
 +          properties.put("Data Block Compression Ratio", String.format("1:%.1f", (double) dataSizeUncompressed / dataSize));
 +        }
 +      }
 +      
 +      properties.put("Meta Block Count", Integer.toString(metaBlkCnt));
 +      long metaSize = 0, metaSizeUncompressed = 0;
 +      if (metaBlkCnt > 0) {
 +        Collection<MetaIndexEntry> metaBlks = reader.readerBCF.metaIndex.index.values();
 +        boolean calculateCompression = false;
 +        for (Iterator<MetaIndexEntry> it = metaBlks.iterator(); it.hasNext();) {
 +          MetaIndexEntry e = it.next();
 +          metaSize += e.getRegion().getCompressedSize();
 +          metaSizeUncompressed += e.getRegion().getRawSize();
 +          if (e.getCompressionAlgorithm() != Compression.Algorithm.NONE) {
 +            calculateCompression = true;
 +          }
 +        }
 +        properties.put("Meta Block Bytes", Long.toString(metaSize));
 +        if (calculateCompression) {
 +          properties.put("Meta Block Uncompressed Bytes", Long.toString(metaSizeUncompressed));
 +          properties.put("Meta Block Compression Ratio", String.format("1:%.1f", (double) metaSizeUncompressed / metaSize));
 +        }
 +      }
 +      properties.put("Meta-Data Size Ratio", String.format("1:%.1f", (double) dataSize / metaSize));
 +      long leftOverBytes = length - dataSize - metaSize;
 +      long miscSize = BCFile.Magic.size() * 2 + Long.SIZE / Byte.SIZE + Version.size();
 +      long metaIndexSize = leftOverBytes - miscSize;
 +      properties.put("Meta Block Index Bytes", Long.toString(metaIndexSize));
 +      properties.put("Headers Etc Bytes", Long.toString(miscSize));
 +      // Now output the properties table.
 +      int maxKeyLength = 0;
 +      Set<Map.Entry<String,String>> entrySet = properties.entrySet();
 +      for (Iterator<Map.Entry<String,String>> it = entrySet.iterator(); it.hasNext();) {
 +        Map.Entry<String,String> e = it.next();
 +        if (e.getKey().length() > maxKeyLength) {
 +          maxKeyLength = e.getKey().length();
 +        }
 +      }
 +      for (Iterator<Map.Entry<String,String>> it = entrySet.iterator(); it.hasNext();) {
 +        Map.Entry<String,String> e = it.next();
 +        out.printf("%s : %s%n", Align.format(e.getKey(), maxKeyLength, Align.LEFT), e.getValue());
 +      }
 +      out.println();
 +      reader.checkTFileDataIndex();
 +      if (blockCnt > 0) {
 +        String blkID = "Data-Block";
 +        int blkIDWidth = Align.calculateWidth(blkID, blockCnt);
 +        int blkIDWidth2 = Align.calculateWidth("", blockCnt);
 +        String offset = "Offset";
 +        int offsetWidth = Align.calculateWidth(offset, length);
 +        String blkLen = "Length";
 +        int blkLenWidth = Align.calculateWidth(blkLen, dataSize / blockCnt * 10);
 +        String rawSize = "Raw-Size";
 +        int rawSizeWidth = Align.calculateWidth(rawSize, dataSizeUncompressed / blockCnt * 10);
 +        String records = "Records";
 +        int recordsWidth = Align.calculateWidth(records, reader.getEntryCount() / blockCnt * 10);
 +        String endKey = "End-Key";
 +        int endKeyWidth = Math.max(endKey.length(), maxKeySampleLen * 2 + 5);
 +        
 +        out.printf("%s %s %s %s %s %s%n", Align.format(blkID, blkIDWidth, Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
 +            Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(rawSize, rawSizeWidth, Align.CENTER),
 +            Align.format(records, recordsWidth, Align.CENTER), Align.format(endKey, endKeyWidth, Align.LEFT));
 +        
 +        for (int i = 0; i < blockCnt; ++i) {
 +          BlockRegion region = reader.readerBCF.dataIndex.getBlockRegionList().get(i);
 +          TFileIndexEntry indexEntry = reader.tfileIndex.getEntry(i);
 +          out.printf("%s %s %s %s %s ", Align.format(Align.format(i, blkIDWidth2, Align.ZERO_PADDED), blkIDWidth, Align.LEFT),
 +              Align.format(region.getOffset(), offsetWidth, Align.LEFT), Align.format(region.getCompressedSize(), blkLenWidth, Align.LEFT),
 +              Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT), Align.format(indexEntry.kvEntries, recordsWidth, Align.LEFT));
 +          byte[] key = indexEntry.key;
 +          boolean asAscii = true;
 +          int sampleLen = Math.min(maxKeySampleLen, key.length);
 +          for (int j = 0; j < sampleLen; ++j) {
 +            byte b = key[j];
 +            if ((b < 32 && b != 9) || (b == 127)) {
 +              asAscii = false;
 +            }
 +          }
 +          if (!asAscii) {
 +            out.print("0X");
 +            for (int j = 0; j < sampleLen; ++j) {
 +              byte b = key[i];
 +              out.printf("%X", b);
 +            }
 +          } else {
 +            out.print(new String(key, 0, sampleLen, Constants.UTF8));
 +          }
 +          if (sampleLen < key.length) {
 +            out.print("...");
 +          }
 +          out.println();
 +        }
 +      }
 +      
 +      out.println();
 +      if (metaBlkCnt > 0) {
 +        String name = "Meta-Block";
 +        int maxNameLen = 0;
 +        Set<Map.Entry<String,MetaIndexEntry>> metaBlkEntrySet = reader.readerBCF.metaIndex.index.entrySet();
 +        for (Iterator<Map.Entry<String,MetaIndexEntry>> it = metaBlkEntrySet.iterator(); it.hasNext();) {
 +          Map.Entry<String,MetaIndexEntry> e = it.next();
 +          if (e.getKey().length() > maxNameLen) {
 +            maxNameLen = e.getKey().length();
 +          }
 +        }
 +        int nameWidth = Math.max(name.length(), maxNameLen);
 +        String offset = "Offset";
 +        int offsetWidth = Align.calculateWidth(offset, length);
 +        String blkLen = "Length";
 +        int blkLenWidth = Align.calculateWidth(blkLen, metaSize / metaBlkCnt * 10);
 +        String rawSize = "Raw-Size";
 +        int rawSizeWidth = Align.calculateWidth(rawSize, metaSizeUncompressed / metaBlkCnt * 10);
 +        String compression = "Compression";
 +        int compressionWidth = compression.length();
 +        out.printf("%s %s %s %s %s%n", Align.format(name, nameWidth, Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
 +            Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(rawSize, rawSizeWidth, Align.CENTER),
 +            Align.format(compression, compressionWidth, Align.LEFT));
 +        
 +        for (Iterator<Map.Entry<String,MetaIndexEntry>> it = metaBlkEntrySet.iterator(); it.hasNext();) {
 +          Map.Entry<String,MetaIndexEntry> e = it.next();
 +          String blkName = e.getValue().getMetaName();
 +          BlockRegion region = e.getValue().getRegion();
 +          String blkCompression = e.getValue().getCompressionAlgorithm().getName();
 +          out.printf("%s %s %s %s %s%n", Align.format(blkName, nameWidth, Align.LEFT), Align.format(region.getOffset(), offsetWidth, Align.LEFT),
 +              Align.format(region.getCompressedSize(), blkLenWidth, Align.LEFT), Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT),
 +              Align.format(blkCompression, compressionWidth, Align.LEFT));
 +        }
 +      }
 +    } finally {
 +      IOUtils.cleanup(LOG, reader, fsdis);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
index 45a59f6,0000000..9131d30
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
@@@ -1,485 -1,0 +1,474 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * 
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +
 +package org.apache.accumulo.core.file.rfile.bcfile;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.Comparator;
 +import java.util.List;
 +
 +import org.apache.hadoop.io.Text;
 +
 +/**
 + * Supporting Utility classes used by TFile, and shared by users of TFile.
 + */
 +public final class Utils {
 +  
 +  /**
 +   * Prevent the instantiation of Utils.
 +   */
 +  private Utils() {
 +    // nothing
 +  }
 +  
 +  /**
 +   * Encoding an integer into a variable-length encoding format. Synonymous to <code>Utils#writeVLong(out, n)</code>.
 +   * 
 +   * @param out
 +   *          output stream
 +   * @param n
 +   *          The integer to be encoded
-    * @throws IOException
 +   * @see Utils#writeVLong(DataOutput, long)
 +   */
 +  public static void writeVInt(DataOutput out, int n) throws IOException {
 +    writeVLong(out, n);
 +  }
 +  
 +  /**
 +   * Encoding a Long integer into a variable-length encoding format.
 +   * <ul>
 +   * <li>if n in [-32, 127): encode in one byte with the actual value. Otherwise,
 +   * <li>if n in [-20*2^8, 20*2^8): encode in two bytes: byte[0] = n/256 - 52; byte[1]=n&0xff. Otherwise,
 +   * <li>if n IN [-16*2^16, 16*2^16): encode in three bytes: byte[0]=n/2^16 - 88; byte[1]=(n>>8)&0xff; byte[2]=n&0xff. Otherwise,
 +   * <li>if n in [-8*2^24, 8*2^24): encode in four bytes: byte[0]=n/2^24 - 112; byte[1] = (n>>16)&0xff; byte[2] = (n>>8)&0xff; byte[3]=n&0xff. Otherwise:
 +   * <li>if n in [-2^31, 2^31): encode in five bytes: byte[0]=-125; byte[1] = (n>>24)&0xff; byte[2]=(n>>16)&0xff; byte[3]=(n>>8)&0xff; byte[4]=n&0xff;
 +   * <li>if n in [-2^39, 2^39): encode in six bytes: byte[0]=-124; byte[1] = (n>>32)&0xff; byte[2]=(n>>24)&0xff; byte[3]=(n>>16)&0xff; byte[4]=(n>>8)&0xff;
 +   * byte[5]=n&0xff
 +   * <li>if n in [-2^47, 2^47): encode in seven bytes: byte[0]=-123; byte[1] = (n>>40)&0xff; byte[2]=(n>>32)&0xff; byte[3]=(n>>24)&0xff; byte[4]=(n>>16)&0xff;
 +   * byte[5]=(n>>8)&0xff; byte[6]=n&0xff;
 +   * <li>if n in [-2^55, 2^55): encode in eight bytes: byte[0]=-122; byte[1] = (n>>48)&0xff; byte[2] = (n>>40)&0xff; byte[3]=(n>>32)&0xff; byte[4]=(n>>24)&0xff;
 +   * byte[5]=(n>>16)&0xff; byte[6]=(n>>8)&0xff; byte[7]=n&0xff;
 +   * <li>if n in [-2^63, 2^63): encode in nine bytes: byte[0]=-121; byte[1] = (n>>54)&0xff; byte[2] = (n>>48)&0xff; byte[3] = (n>>40)&0xff;
 +   * byte[4]=(n>>32)&0xff; byte[5]=(n>>24)&0xff; byte[6]=(n>>16)&0xff; byte[7]=(n>>8)&0xff; byte[8]=n&0xff;
 +   * </ul>
 +   * 
 +   * @param out
 +   *          output stream
 +   * @param n
 +   *          the integer number
-    * @throws IOException
 +   */
 +  @SuppressWarnings("fallthrough")
 +  public static void writeVLong(DataOutput out, long n) throws IOException {
 +    if ((n < 128) && (n >= -32)) {
 +      out.writeByte((int) n);
 +      return;
 +    }
 +    
 +    long un = (n < 0) ? ~n : n;
 +    // how many bytes do we need to represent the number with sign bit?
 +    int len = (Long.SIZE - Long.numberOfLeadingZeros(un)) / 8 + 1;
 +    int firstByte = (int) (n >> ((len - 1) * 8));
 +    switch (len) {
 +      case 1:
 +        // fall it through to firstByte==-1, len=2.
 +        firstByte >>= 8;
 +      case 2:
 +        if ((firstByte < 20) && (firstByte >= -20)) {
 +          out.writeByte(firstByte - 52);
 +          out.writeByte((int) n);
 +          return;
 +        }
 +        // fall it through to firstByte==0/-1, len=3.
 +        firstByte >>= 8;
 +      case 3:
 +        if ((firstByte < 16) && (firstByte >= -16)) {
 +          out.writeByte(firstByte - 88);
 +          out.writeShort((int) n);
 +          return;
 +        }
 +        // fall it through to firstByte==0/-1, len=4.
 +        firstByte >>= 8;
 +      case 4:
 +        if ((firstByte < 8) && (firstByte >= -8)) {
 +          out.writeByte(firstByte - 112);
 +          out.writeShort(((int) n) >>> 8);
 +          out.writeByte((int) n);
 +          return;
 +        }
 +        out.writeByte(len - 129);
 +        out.writeInt((int) n);
 +        return;
 +      case 5:
 +        out.writeByte(len - 129);
 +        out.writeInt((int) (n >>> 8));
 +        out.writeByte((int) n);
 +        return;
 +      case 6:
 +        out.writeByte(len - 129);
 +        out.writeInt((int) (n >>> 16));
 +        out.writeShort((int) n);
 +        return;
 +      case 7:
 +        out.writeByte(len - 129);
 +        out.writeInt((int) (n >>> 24));
 +        out.writeShort((int) (n >>> 8));
 +        out.writeByte((int) n);
 +        return;
 +      case 8:
 +        out.writeByte(len - 129);
 +        out.writeLong(n);
 +        return;
 +      default:
 +        throw new RuntimeException("Internel error");
 +    }
 +  }
 +  
 +  /**
 +   * Decoding the variable-length integer. Synonymous to <code>(int)Utils#readVLong(in)</code>.
 +   * 
 +   * @param in
 +   *          input stream
 +   * @return the decoded integer
-    * @throws IOException
 +   * 
 +   * @see Utils#readVLong(DataInput)
 +   */
 +  public static int readVInt(DataInput in) throws IOException {
 +    long ret = readVLong(in);
 +    if ((ret > Integer.MAX_VALUE) || (ret < Integer.MIN_VALUE)) {
 +      throw new RuntimeException("Number too large to be represented as Integer");
 +    }
 +    return (int) ret;
 +  }
 +  
 +  /**
 +   * Decoding the variable-length integer. Suppose the value of the first byte is FB, and the following bytes are NB[*].
 +   * <ul>
 +   * <li>if (FB >= -32), return (long)FB;
 +   * <li>if (FB in [-72, -33]), return (FB+52)<<8 + NB[0]&0xff;
 +   * <li>if (FB in [-104, -73]), return (FB+88)<<16 + (NB[0]&0xff)<<8 + NB[1]&0xff;
 +   * <li>if (FB in [-120, -105]), return (FB+112)<<24 + (NB[0]&0xff)<<16 + (NB[1]&0xff)<<8 + NB[2]&0xff;
 +   * <li>if (FB in [-128, -121]), return interpret NB[FB+129] as a signed big-endian integer.
 +   * 
 +   * @param in
 +   *          input stream
 +   * @return the decoded long integer.
-    * @throws IOException
 +   */
 +  
 +  public static long readVLong(DataInput in) throws IOException {
 +    int firstByte = in.readByte();
 +    if (firstByte >= -32) {
 +      return firstByte;
 +    }
 +    
 +    switch ((firstByte + 128) / 8) {
 +      case 11:
 +      case 10:
 +      case 9:
 +      case 8:
 +      case 7:
 +        return ((firstByte + 52) << 8) | in.readUnsignedByte();
 +      case 6:
 +      case 5:
 +      case 4:
 +      case 3:
 +        return ((firstByte + 88) << 16) | in.readUnsignedShort();
 +      case 2:
 +      case 1:
 +        return ((firstByte + 112) << 24) | (in.readUnsignedShort() << 8) | in.readUnsignedByte();
 +      case 0:
 +        int len = firstByte + 129;
 +        switch (len) {
 +          case 4:
 +            return in.readInt();
 +          case 5:
 +            return ((long) in.readInt()) << 8 | in.readUnsignedByte();
 +          case 6:
 +            return ((long) in.readInt()) << 16 | in.readUnsignedShort();
 +          case 7:
 +            return ((long) in.readInt()) << 24 | (in.readUnsignedShort() << 8) | in.readUnsignedByte();
 +          case 8:
 +            return in.readLong();
 +          default:
 +            throw new IOException("Corrupted VLong encoding");
 +        }
 +      default:
 +        throw new RuntimeException("Internal error");
 +    }
 +  }
 +  
 +  /**
 +   * Write a String as a VInt n, followed by n Bytes as in Text format.
-    * 
-    * @param out
-    * @param s
-    * @throws IOException
 +   */
 +  public static void writeString(DataOutput out, String s) throws IOException {
 +    if (s != null) {
 +      Text text = new Text(s);
 +      byte[] buffer = text.getBytes();
 +      int len = text.getLength();
 +      writeVInt(out, len);
 +      out.write(buffer, 0, len);
 +    } else {
 +      writeVInt(out, -1);
 +    }
 +  }
 +  
 +  /**
 +   * Read a String as a VInt n, followed by n Bytes in Text format.
 +   * 
 +   * @param in
 +   *          The input stream.
 +   * @return The string
-    * @throws IOException
 +   */
 +  public static String readString(DataInput in) throws IOException {
 +    int length = readVInt(in);
 +    if (length == -1)
 +      return null;
 +    byte[] buffer = new byte[length];
 +    in.readFully(buffer);
 +    return Text.decode(buffer);
 +  }
 +  
 +  /**
 +   * A generic Version class. We suggest applications built on top of TFile use this class to maintain version information in their meta blocks.
 +   * 
 +   * A version number consists of a major version and a minor version. The suggested usage of major and minor version number is to increment major version
 +   * number when the new storage format is not backward compatible, and increment the minor version otherwise.
 +   */
 +  public static final class Version implements Comparable<Version> {
 +    private final short major;
 +    private final short minor;
 +    
 +    /**
 +     * Construct the Version object by reading from the input stream.
 +     * 
 +     * @param in
 +     *          input stream
-      * @throws IOException
 +     */
 +    public Version(DataInput in) throws IOException {
 +      major = in.readShort();
 +      minor = in.readShort();
 +    }
 +    
 +    /**
 +     * Constructor.
 +     * 
 +     * @param major
 +     *          major version.
 +     * @param minor
 +     *          minor version.
 +     */
 +    public Version(short major, short minor) {
 +      this.major = major;
 +      this.minor = minor;
 +    }
 +    
 +    /**
 +     * Write the object to a DataOutput. The serialized format of the Version is major version followed by minor version, both as big-endian short integers.
 +     * 
 +     * @param out
 +     *          The DataOutput object.
-      * @throws IOException
 +     */
 +    public void write(DataOutput out) throws IOException {
 +      out.writeShort(major);
 +      out.writeShort(minor);
 +    }
 +    
 +    /**
 +     * Get the major version.
 +     * 
 +     * @return Major version.
 +     */
 +    public int getMajor() {
 +      return major;
 +    }
 +    
 +    /**
 +     * Get the minor version.
 +     * 
 +     * @return The minor version.
 +     */
 +    public int getMinor() {
 +      return minor;
 +    }
 +    
 +    /**
 +     * Get the size of the serialized Version object.
 +     * 
 +     * @return serialized size of the version object.
 +     */
 +    public static int size() {
 +      return (Short.SIZE + Short.SIZE) / Byte.SIZE;
 +    }
 +    
 +    /**
 +     * Return a string representation of the version.
 +     */
 +    @Override
 +    public String toString() {
 +      return new StringBuilder("v").append(major).append(".").append(minor).toString();
 +    }
 +    
 +    /**
 +     * Test compatibility.
 +     * 
 +     * @param other
 +     *          The Version object to test compatibility with.
 +     * @return true if both versions have the same major version number; false otherwise.
 +     */
 +    public boolean compatibleWith(Version other) {
 +      return major == other.major;
 +    }
 +    
 +    /**
 +     * Compare this version with another version.
 +     */
 +    @Override
 +    public int compareTo(Version that) {
 +      if (major != that.major) {
 +        return major - that.major;
 +      }
 +      return minor - that.minor;
 +    }
 +    
 +    @Override
 +    public boolean equals(Object other) {
 +      if (this == other)
 +        return true;
 +      if (!(other instanceof Version))
 +        return false;
 +      return compareTo((Version) other) == 0;
 +    }
 +    
 +    @Override
 +    public int hashCode() {
 +      return (major << 16 + minor);
 +    }
 +  }
 +  
 +  /**
 +   * Lower bound binary search. Find the index to the first element in the list that compares greater than or equal to key.
 +   * 
 +   * @param <T>
 +   *          Type of the input key.
 +   * @param list
 +   *          The list
 +   * @param key
 +   *          The input key.
 +   * @param cmp
 +   *          Comparator for the key.
 +   * @return The index to the desired element if it exists; or list.size() otherwise.
 +   */
 +  public static <T> int lowerBound(List<? extends T> list, T key, Comparator<? super T> cmp) {
 +    int low = 0;
 +    int high = list.size();
 +    
 +    while (low < high) {
 +      int mid = (low + high) >>> 1;
 +      T midVal = list.get(mid);
 +      int ret = cmp.compare(midVal, key);
 +      if (ret < 0)
 +        low = mid + 1;
 +      else
 +        high = mid;
 +    }
 +    return low;
 +  }
 +  
 +  /**
 +   * Upper bound binary search. Find the index to the first element in the list that compares greater than the input key.
 +   * 
 +   * @param <T>
 +   *          Type of the input key.
 +   * @param list
 +   *          The list
 +   * @param key
 +   *          The input key.
 +   * @param cmp
 +   *          Comparator for the key.
 +   * @return The index to the desired element if it exists; or list.size() otherwise.
 +   */
 +  public static <T> int upperBound(List<? extends T> list, T key, Comparator<? super T> cmp) {
 +    int low = 0;
 +    int high = list.size();
 +    
 +    while (low < high) {
 +      int mid = (low + high) >>> 1;
 +      T midVal = list.get(mid);
 +      int ret = cmp.compare(midVal, key);
 +      if (ret <= 0)
 +        low = mid + 1;
 +      else
 +        high = mid;
 +    }
 +    return low;
 +  }
 +  
 +  /**
 +   * Lower bound binary search. Find the index to the first element in the list that compares greater than or equal to key.
 +   * 
 +   * @param <T>
 +   *          Type of the input key.
 +   * @param list
 +   *          The list
 +   * @param key
 +   *          The input key.
 +   * @return The index to the desired element if it exists; or list.size() otherwise.
 +   */
 +  public static <T> int lowerBound(List<? extends Comparable<? super T>> list, T key) {
 +    int low = 0;
 +    int high = list.size();
 +    
 +    while (low < high) {
 +      int mid = (low + high) >>> 1;
 +      Comparable<? super T> midVal = list.get(mid);
 +      int ret = midVal.compareTo(key);
 +      if (ret < 0)
 +        low = mid + 1;
 +      else
 +        high = mid;
 +    }
 +    return low;
 +  }
 +  
 +  /**
 +   * Upper bound binary search. Find the index to the first element in the list that compares greater than the input key.
 +   * 
 +   * @param <T>
 +   *          Type of the input key.
 +   * @param list
 +   *          The list
 +   * @param key
 +   *          The input key.
 +   * @return The index to the desired element if it exists; or list.size() otherwise.
 +   */
 +  public static <T> int upperBound(List<? extends Comparable<? super T>> list, T key) {
 +    int low = 0;
 +    int high = list.size();
 +    
 +    while (low < high) {
 +      int mid = (low + high) >>> 1;
 +      Comparable<? super T> midVal = list.get(mid);
 +      int ret = midVal.compareTo(key);
 +      if (ret <= 0)
 +        low = mid + 1;
 +      else
 +        high = mid;
 +    }
 +    return low;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
index 54a1333,0000000..5b7b05c
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
@@@ -1,249 -1,0 +1,243 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators;
 +
 +import java.io.IOException;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.NoSuchElementException;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +
 +/**
 + * A Combiner that decodes each Value to type V before reducing, then encodes the result of typedReduce back to Value.
 + * 
 + * Subclasses must implement a typedReduce method: public V typedReduce(Key key, Iterator<V> iter);
 + * 
 + * This typedReduce method will be passed the most recent Key and an iterator over the Values (translated to Vs) for all non-deleted versions of that Key.
 + * 
 + * Subclasses may implement a switch on the "type" variable to choose an Encoder in their init method.
 + */
 +public abstract class TypedValueCombiner<V> extends Combiner {
 +  private Encoder<V> encoder = null;
 +  private boolean lossy = false;
 +  
 +  protected static final String LOSSY = "lossy";
 +  
 +  /**
 +   * A Java Iterator that translates an Iterator<Value> to an Iterator<V> using the decode method of an Encoder.
 +   */
 +  private static class VIterator<V> implements Iterator<V> {
 +    private Iterator<Value> source;
 +    private Encoder<V> encoder;
 +    private boolean lossy;
 +    
 +    /**
 +     * Constructs an Iterator<V> from an Iterator<Value>
 +     * 
 +     * @param iter
 +     *          The source iterator
 +     * 
 +     * @param encoder
 +     *          The Encoder whose decode method is used to translate from Value to V
 +     * 
 +     * @param lossy
 +     *          Determines whether to error on failure to decode or ignore and move on
 +     */
 +    VIterator(Iterator<Value> iter, Encoder<V> encoder, boolean lossy) {
 +      this.source = iter;
 +      this.encoder = encoder;
 +      this.lossy = lossy;
 +    }
 +    
 +    V next = null;
 +    boolean hasNext = false;
 +    
 +    @Override
 +    public boolean hasNext() {
 +      if (hasNext)
 +        return true;
 +      
 +      while (true) {
 +        if (!source.hasNext())
 +          return false;
 +        try {
 +          next = encoder.decode(source.next().get());
 +          return hasNext = true;
 +        } catch (ValueFormatException vfe) {
 +          if (!lossy)
 +            throw vfe;
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public V next() {
 +      if (!hasNext && !hasNext())
 +        throw new NoSuchElementException();
 +      V toRet = next;
 +      next = null;
 +      hasNext = false;
 +      return toRet;
 +    }
 +    
 +    @Override
 +    public void remove() {
 +      source.remove();
 +    }
 +  }
 +  
 +  /**
 +   * An interface for translating from byte[] to V and back.
 +   */
 +  public static interface Encoder<V> {
 +    public byte[] encode(V v);
 +    
 +    public V decode(byte[] b) throws ValueFormatException;
 +  }
 +  
 +  /**
 +   * Sets the Encoder<V> used to translate Values to V and back.
-    * 
-    * @param encoder
 +   */
 +  protected void setEncoder(Encoder<V> encoder) {
 +    this.encoder = encoder;
 +  }
 +  
 +  /**
 +   * Instantiates and sets the Encoder<V> used to translate Values to V and back.
 +   * 
-    * @param encoderClass
 +   * @throws IllegalArgumentException
 +   *           if ClassNotFoundException, InstantiationException, or IllegalAccessException occurs
 +   */
 +  protected void setEncoder(String encoderClass) {
 +    try {
 +      @SuppressWarnings("unchecked")
 +      Class<? extends Encoder<V>> clazz = (Class<? extends Encoder<V>>) AccumuloVFSClassLoader.loadClass(encoderClass, Encoder.class);
 +      encoder = clazz.newInstance();
 +    } catch (ClassNotFoundException e) {
 +      throw new IllegalArgumentException(e);
 +    } catch (InstantiationException e) {
 +      throw new IllegalArgumentException(e);
 +    } catch (IllegalAccessException e) {
 +      throw new IllegalArgumentException(e);
 +    }
 +  }
 +  
 +  /**
 +   * Tests whether v remains the same when encoded and decoded with the current encoder.
 +   * 
-    * @param v
 +   * @throws IllegalStateException
 +   *           if an encoder has not been set.
 +   * @throws IllegalArgumentException
 +   *           if the test fails.
 +   */
 +  protected void testEncoder(V v) {
 +    if (encoder == null)
 +      throw new IllegalStateException("encoder has not been initialized");
 +    testEncoder(encoder, v);
 +  }
 +  
 +  /**
 +   * Tests whether v remains the same when encoded and decoded with the given encoder.
 +   * 
-    * @param encoder
-    * @param v
 +   * @throws IllegalArgumentException
 +   *           if the test fails.
 +   */
 +  public static <V> void testEncoder(Encoder<V> encoder, V v) {
 +    try {
 +      if (!v.equals(encoder.decode(encoder.encode(v))))
 +        throw new IllegalArgumentException("something wrong with " + encoder.getClass().getName() + " -- doesn't encode and decode " + v + " properly");
 +    } catch (ClassCastException e) {
 +      throw new IllegalArgumentException(encoder.getClass().getName() + " doesn't encode " + v.getClass().getName());
 +    }
 +  }
 +  
 +  @SuppressWarnings("unchecked")
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    TypedValueCombiner<V> newInstance = (TypedValueCombiner<V>) super.deepCopy(env);
 +    newInstance.setEncoder(encoder);
 +    return newInstance;
 +  }
 +  
 +  @Override
 +  public Value reduce(Key key, Iterator<Value> iter) {
 +    return new Value(encoder.encode(typedReduce(key, new VIterator<V>(iter, encoder, lossy))));
 +  }
 +  
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    setLossyness(options);
 +  }
 +  
 +  private void setLossyness(Map<String,String> options) {
 +    String loss = options.get(LOSSY);
 +    if (loss == null)
 +      lossy = false;
 +    else
 +      lossy = Boolean.parseBoolean(loss);
 +  }
 +  
 +  @Override
 +  public IteratorOptions describeOptions() {
 +    IteratorOptions io = super.describeOptions();
 +    io.addNamedOption(LOSSY, "if true, failed decodes are ignored. Otherwise combiner will error on failed decodes (default false): <TRUE|FALSE>");
 +    return io;
 +  }
 +  
 +  @Override
 +  public boolean validateOptions(Map<String,String> options) {
 +    if (super.validateOptions(options) == false)
 +      return false;
 +    try {
 +      setLossyness(options);
 +    } catch (Exception e) {
 +      throw new IllegalArgumentException("bad boolean " + LOSSY + ":" + options.get(LOSSY));
 +    }
 +    return true;
 +  }
 +  
 +  /**
 +   * A convenience method to set the "lossy" option on a TypedValueCombiner. If true, the combiner will ignore any values which fail to decode. Otherwise, the
 +   * combiner will throw an error which will interrupt the action (and prevent potential data loss). False is the default behavior.
 +   * 
 +   * @param is
 +   *          iterator settings object to configure
 +   * @param lossy
 +   *          if true the combiner will ignored values which fail to decode; otherwise error.
 +   */
 +  public static void setLossyness(IteratorSetting is, boolean lossy) {
 +    is.addOption(LOSSY, Boolean.toString(lossy));
 +  }
 +  
 +  /**
 +   * Reduces a list of V into a single V.
 +   * 
 +   * @param key
 +   *          The most recent version of the Key being reduced.
 +   * 
 +   * @param iter
 +   *          An iterator over the V for different versions of the key.
 +   * 
 +   * @return The combined V.
 +   */
 +  public abstract V typedReduce(Key key, Iterator<V> iter);
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
index 7bb2228,0000000..7ede7fe
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
@@@ -1,40 -1,0 +1,34 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators;
 +
 +/**
 + * Exception used for TypedValueCombiner and it's Encoders decode() function
 + */
 +public class ValueFormatException extends IllegalArgumentException {
 +  
-   /**
-    * @param string
-    */
 +  public ValueFormatException(String string) {
 +    super(string);
 +  }
 +
-   /**
-    * @param nfe
-    */
 +  public ValueFormatException(Exception nfe) {
 +    super(nfe);
 +  }
 +
 +  private static final long serialVersionUID = 4170291568272971821L;
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
index e5fe62a,0000000..37a234c
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
@@@ -1,162 -1,0 +1,156 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators.system;
 +
 +import java.io.DataInputStream;
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Map;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.NoSuchMetaStoreException;
 +import org.apache.accumulo.core.file.map.MapFileUtil;
 +import org.apache.accumulo.core.iterators.IterationInterruptedException;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.MapFile.Reader;
 +import org.apache.log4j.Logger;
 +
 +public class MapFileIterator implements FileSKVIterator {
 +  private static final Logger log = Logger.getLogger(MapFileIterator.class);
 +
 +  private Reader reader;
 +  private Value topValue;
 +  private Key topKey;
 +  private AtomicBoolean interruptFlag;
 +  private int interruptCheckCount = 0;
 +  private FileSystem fs;
 +  private String dirName;
 +  
-   /**
-    * @param acuconf
-    * @param fs
-    * @param dir
-    * @param conf
-    * @throws IOException
-    */
 +  public MapFileIterator(AccumuloConfiguration acuconf, FileSystem fs, String dir, Configuration conf) throws IOException {
 +    this.reader = MapFileUtil.openMapFile(acuconf, fs, dir, conf);
 +    this.fs = fs;
 +    this.dirName = dir;
 +  }
 +
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    this.interruptFlag = flag;
 +  }
 +  
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    throw new UnsupportedOperationException();
 +  }
 +  
 +  @Override
 +  public boolean hasTop() {
 +    return topKey != null;
 +  }
 +  
 +  @Override
 +  public void next() throws IOException {
 +    if (interruptFlag != null && interruptCheckCount++ % 100 == 0 && interruptFlag.get())
 +      throw new IterationInterruptedException();
 +    
 +    reader.next(topKey, topValue);
 +  }
 +  
++  @Override
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
 +    if (columnFamilies.size() != 0 || inclusive) {
 +      throw new IllegalArgumentException("I do not know how to filter column families");
 +    }
 +    
 +    if (range == null)
 +      throw new IllegalArgumentException("Cannot seek to null range");
 +    
 +    if (interruptFlag != null && interruptFlag.get())
 +      throw new IterationInterruptedException();
 +    
 +    Key key = range.getStartKey();
 +    if (key == null) {
 +      key = new Key();
 +    }
 +    
 +    reader.seek(key);
 +    
 +    while (hasTop() && range.beforeStartKey(getTopKey())) {
 +      next();
 +    }
 +  }
 +  
 +  @Override
 +  public Key getTopKey() {
 +    return topKey;
 +  }
 +  
 +  @Override
 +  public Value getTopValue() {
 +    return topValue;
 +  }
 +  
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    try {
 +      SortedKeyValueIterator<Key,Value> other = env.reserveMapFileReader(dirName);
 +      ((InterruptibleIterator) other).setInterruptFlag(interruptFlag);
 +      log.debug("deep copying MapFile: " + this + " -> " + other);
 +      return other;
 +    } catch (IOException e) {
 +      log.error("failed to clone map file reader", e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +  
 +  @Override
 +  public Key getFirstKey() throws IOException {
 +    throw new UnsupportedOperationException();
 +  }
 +  
 +  @Override
 +  public Key getLastKey() throws IOException {
 +    throw new UnsupportedOperationException();
 +  }
 +  
 +  @Override
 +  public DataInputStream getMetaStore(String name) throws IOException {
 +    Path path = new Path(this.dirName, name);
 +    if (!fs.exists(path))
 +      throw new NoSuchMetaStoreException("name = " + name);
 +    return fs.open(path);
 +  }
 +  
 +  @Override
 +  public void closeDeepCopies() throws IOException {
 +    // nothing to do, deep copies are externally managed/closed
 +  }
 +  
 +  @Override
 +  public void close() throws IOException {
 +    reader.close();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
index 4f8207c,0000000..86798dd
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
@@@ -1,104 -1,0 +1,101 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators.user;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.Filter;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +
 +/**
 + * This iterator provides exact string matching. It searches both the Key and Value for the string. The string to match is specified by the "term" option.
 + */
 +public class GrepIterator extends Filter {
 +  
 +  private byte term[];
 +  
 +  @Override
 +  public boolean accept(Key k, Value v) {
 +    return match(v.get()) || match(k.getRowData()) || match(k.getColumnFamilyData()) || match(k.getColumnQualifierData());
 +  }
 +  
 +  private boolean match(ByteSequence bs) {
 +    return indexOf(bs.getBackingArray(), bs.offset(), bs.length(), term) >= 0;
 +  }
 +  
 +  private boolean match(byte[] ba) {
 +    return indexOf(ba, 0, ba.length, term) >= 0;
 +  }
 +  
 +  // copied code below from java string and modified
 +  
 +  private static int indexOf(byte[] source, int sourceOffset, int sourceCount, byte[] target) {
 +    byte first = target[0];
 +    int targetCount = target.length;
 +    int max = sourceOffset + (sourceCount - targetCount);
 +    
 +    for (int i = sourceOffset; i <= max; i++) {
 +      /* Look for first character. */
 +      if (source[i] != first) {
 +        while (++i <= max && source[i] != first)
 +          continue;
 +      }
 +      
 +      /* Found first character, now look at the rest of v2 */
 +      if (i <= max) {
 +        int j = i + 1;
 +        int end = j + targetCount - 1;
 +        for (int k = 1; j < end && source[j] == target[k]; j++, k++)
 +          continue;
 +        
 +        if (j == end) {
 +          /* Found whole string. */
 +          return i - sourceOffset;
 +        }
 +      }
 +    }
 +    return -1;
 +  }
 +  
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    GrepIterator copy = (GrepIterator) super.deepCopy(env);
 +    copy.term = Arrays.copyOf(term, term.length);
 +    return copy;
 +  }
 +  
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    term = options.get("term").getBytes(Constants.UTF8);
 +  }
 +  
 +  /**
 +   * Encode the grep term as an option for a ScanIterator
-    * 
-    * @param cfg
-    * @param term
 +   */
 +  public static void setTerm(IteratorSetting cfg, String term) {
 +    cfg.addOption("term", term);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
index 447200b,0000000..39cba6d
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
@@@ -1,558 -1,0 +1,548 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators.user;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.data.ArrayByteSequence;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of
 + * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index.
 + * 
 + * The table structure should have the following form:
 + * 
 + * row: shardID, colfam: term, colqual: docID
 + * 
 + * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The
 + * result will have an empty column family, as follows:
 + * 
 + * row: shardID, colfam: (empty), colqual: docID
 + * 
 + * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
 + * 
 + * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes
 + * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
 + * 
 + * README.shard in docs/examples shows an example of using the IntersectingIterator.
 + */
 +public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
 +  
 +  protected Text nullText = new Text();
 +  
 +  protected Text getPartition(Key key) {
 +    return key.getRow();
 +  }
 +  
 +  protected Text getTerm(Key key) {
 +    return key.getColumnFamily();
 +  }
 +  
 +  protected Text getDocID(Key key) {
 +    return key.getColumnQualifier();
 +  }
 +  
 +  protected Key buildKey(Text partition, Text term) {
 +    return new Key(partition, (term == null) ? nullText : term);
 +  }
 +  
 +  protected Key buildKey(Text partition, Text term, Text docID) {
 +    return new Key(partition, (term == null) ? nullText : term, docID);
 +  }
 +  
 +  protected Key buildFollowingPartitionKey(Key key) {
 +    return key.followingKey(PartialKey.ROW);
 +  }
 +  
 +  protected static final Logger log = Logger.getLogger(IntersectingIterator.class);
 +  
 +  public static class TermSource {
 +    public SortedKeyValueIterator<Key,Value> iter;
 +    public Text term;
 +    public Collection<ByteSequence> seekColfams;
 +    public boolean notFlag;
 +    
 +    public TermSource(TermSource other) {
 +      this.iter = other.iter;
 +      this.term = other.term;
 +      this.notFlag = other.notFlag;
 +      this.seekColfams = other.seekColfams;
 +    }
 +    
 +    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
 +      this(iter, term, false);
 +    }
 +    
 +    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
 +      this.iter = iter;
 +      this.term = term;
 +      this.notFlag = notFlag;
 +      // The desired column families for this source is the term itself
 +      this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
 +    }
 +    
 +    public String getTermString() {
 +      return (this.term == null) ? "Iterator" : this.term.toString();
 +    }
 +  }
 +  
 +  protected TermSource[] sources;
 +  int sourcesCount = 0;
 +  
 +  Range overallRange;
 +  
 +  // query-time settings
 +  protected Text currentPartition = null;
 +  protected Text currentDocID = new Text(emptyByteArray);
 +  static final byte[] emptyByteArray = new byte[0];
 +  
 +  protected Key topKey = null;
 +  protected Value value = new Value(emptyByteArray);
 +  
 +  public IntersectingIterator() {}
 +  
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new IntersectingIterator(this, env);
 +  }
 +  
 +  private IntersectingIterator(IntersectingIterator other, IteratorEnvironment env) {
 +    if (other.sources != null) {
 +      sourcesCount = other.sourcesCount;
 +      sources = new TermSource[sourcesCount];
 +      for (int i = 0; i < sourcesCount; i++) {
 +        sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term);
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public Key getTopKey() {
 +    return topKey;
 +  }
 +  
 +  @Override
 +  public Value getTopValue() {
 +    // we don't really care about values
 +    return value;
 +  }
 +  
 +  @Override
 +  public boolean hasTop() {
 +    return currentPartition != null;
 +  }
 +  
 +  // precondition: currentRow is not null
 +  private boolean seekOneSource(int sourceID) throws IOException {
 +    // find the next key in the appropriate column family that is at or beyond the cursor (currentRow, currentCQ)
 +    // advance the cursor if this source goes beyond it
 +    // return whether we advanced the cursor
 +    
 +    // within this loop progress must be made in one of the following forms:
 +    // - currentRow or currentCQ must be increased
 +    // - the given source must advance its iterator
 +    // this loop will end when any of the following criteria are met
 +    // - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentCQ)
 +    // - the given source is out of data and currentRow is set to null
 +    // - the given source has advanced beyond the endRow and currentRow is set to null
 +    boolean advancedCursor = false;
 +    
 +    if (sources[sourceID].notFlag) {
 +      while (true) {
 +        if (sources[sourceID].iter.hasTop() == false) {
 +          // an empty column that you are negating is a valid condition
 +          break;
 +        }
 +        // check if we're past the end key
 +        int endCompare = -1;
 +        // we should compare the row to the end of the range
 +        if (overallRange.getEndKey() != null) {
 +          endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
 +          if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
 +            // an empty column that you are negating is a valid condition
 +            break;
 +          }
 +        }
 +        int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
 +        // check if this source is already at or beyond currentRow
 +        // if not, then seek to at least the current row
 +        
 +        if (partitionCompare > 0) {
 +          // seek to at least the currentRow
 +          Key seekKey = buildKey(currentPartition, sources[sourceID].term);
 +          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
 +          continue;
 +        }
 +        // check if this source has gone beyond currentRow
 +        // if so, this is a valid condition for negation
 +        if (partitionCompare < 0) {
 +          break;
 +        }
 +        // we have verified that the current source is positioned in currentRow
 +        // now we must make sure we're in the right columnFamily in the current row
 +        // Note: Iterators are auto-magically set to the correct columnFamily
 +        if (sources[sourceID].term != null) {
 +          int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
 +          // check if this source is already on the right columnFamily
 +          // if not, then seek forwards to the right columnFamily
 +          if (termCompare > 0) {
 +            Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
 +            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
 +            continue;
 +          }
 +          // check if this source is beyond the right columnFamily
 +          // if so, then this is a valid condition for negating
 +          if (termCompare < 0) {
 +            break;
 +          }
 +        }
 +        
 +        // we have verified that we are in currentRow and the correct column family
 +        // make sure we are at or beyond columnQualifier
 +        Text docID = getDocID(sources[sourceID].iter.getTopKey());
 +        int docIDCompare = currentDocID.compareTo(docID);
 +        // If we are past the target, this is a valid result
 +        if (docIDCompare < 0) {
 +          break;
 +        }
 +        // if this source is not yet at the currentCQ then advance in this source
 +        if (docIDCompare > 0) {
 +          // seek forwards
 +          Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
 +          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
 +          continue;
 +        }
 +        // if we are equal to the target, this is an invalid result.
 +        // Force the entire process to go to the next row.
 +        // We are advancing column 0 because we forced that column to not contain a !
 +        // when we did the init()
 +        if (docIDCompare == 0) {
 +          sources[0].iter.next();
 +          advancedCursor = true;
 +          break;
 +        }
 +      }
 +    } else {
 +      while (true) {
 +        if (sources[sourceID].iter.hasTop() == false) {
 +          currentPartition = null;
 +          // setting currentRow to null counts as advancing the cursor
 +          return true;
 +        }
 +        // check if we're past the end key
 +        int endCompare = -1;
 +        // we should compare the row to the end of the range
 +        
 +        if (overallRange.getEndKey() != null) {
 +          endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
 +          if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
 +            currentPartition = null;
 +            // setting currentRow to null counts as advancing the cursor
 +            return true;
 +          }
 +        }
 +        int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
 +        // check if this source is already at or beyond currentRow
 +        // if not, then seek to at least the current row
 +        if (partitionCompare > 0) {
 +          // seek to at least the currentRow
 +          Key seekKey = buildKey(currentPartition, sources[sourceID].term);
 +          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
 +          continue;
 +        }
 +        // check if this source has gone beyond currentRow
 +        // if so, advance currentRow
 +        if (partitionCompare < 0) {
 +          currentPartition.set(getPartition(sources[sourceID].iter.getTopKey()));
 +          currentDocID.set(emptyByteArray);
 +          advancedCursor = true;
 +          continue;
 +        }
 +        // we have verified that the current source is positioned in currentRow
 +        // now we must make sure we're in the right columnFamily in the current row
 +        // Note: Iterators are auto-magically set to the correct columnFamily
 +        
 +        if (sources[sourceID].term != null) {
 +          int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
 +          // check if this source is already on the right columnFamily
 +          // if not, then seek forwards to the right columnFamily
 +          if (termCompare > 0) {
 +            Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
 +            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
 +            continue;
 +          }
 +          // check if this source is beyond the right columnFamily
 +          // if so, then seek to the next row
 +          if (termCompare < 0) {
 +            // we're out of entries in the current row, so seek to the next one
 +            // byte[] currentRowBytes = currentRow.getBytes();
 +            // byte[] nextRow = new byte[currentRowBytes.length + 1];
 +            // System.arraycopy(currentRowBytes, 0, nextRow, 0, currentRowBytes.length);
 +            // nextRow[currentRowBytes.length] = (byte)0;
 +            // // we should reuse text objects here
 +            // sources[sourceID].seek(new Key(new Text(nextRow),columnFamilies[sourceID]));
 +            if (endCompare == 0) {
 +              // we're done
 +              currentPartition = null;
 +              // setting currentRow to null counts as advancing the cursor
 +              return true;
 +            }
 +            Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
 +            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
 +            continue;
 +          }
 +        }
 +        // we have verified that we are in currentRow and the correct column family
 +        // make sure we are at or beyond columnQualifier
 +        Text docID = getDocID(sources[sourceID].iter.getTopKey());
 +        int docIDCompare = currentDocID.compareTo(docID);
 +        // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
 +        if (docIDCompare < 0) {
 +          currentDocID.set(docID);
 +          advancedCursor = true;
 +          break;
 +        }
 +        // if this source is not yet at the currentCQ then seek in this source
 +        if (docIDCompare > 0) {
 +          // seek forwards
 +          Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
 +          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
 +          continue;
 +        }
 +        // this source is at the current row, in its column family, and at currentCQ
 +        break;
 +      }
 +    }
 +    return advancedCursor;
 +  }
 +  
 +  @Override
 +  public void next() throws IOException {
 +    if (currentPartition == null) {
 +      return;
 +    }
 +    // precondition: the current row is set up and the sources all have the same column qualifier
 +    // while we don't have a match, seek in the source with the smallest column qualifier
 +    sources[0].iter.next();
 +    advanceToIntersection();
 +  }
 +  
 +  protected void advanceToIntersection() throws IOException {
 +    boolean cursorChanged = true;
 +    while (cursorChanged) {
 +      // seek all of the sources to at least the highest seen column qualifier in the current row
 +      cursorChanged = false;
 +      for (int i = 0; i < sourcesCount; i++) {
 +        if (currentPartition == null) {
 +          topKey = null;
 +          return;
 +        }
 +        if (seekOneSource(i)) {
 +          cursorChanged = true;
 +          break;
 +        }
 +      }
 +    }
 +    topKey = buildKey(currentPartition, nullText, currentDocID);
 +  }
 +  
 +  public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
 +    if (iter.hasTop())
 +      return iter.getTopKey().toString();
 +    return "";
 +  }
 +  
 +  private static final String columnFamiliesOptionName = "columnFamilies";
 +  private static final String notFlagOptionName = "notFlag";
 +  
 +  /**
-    * @param columns
 +   * @return encoded columns
 +   */
 +  protected static String encodeColumns(Text[] columns) {
 +    StringBuilder sb = new StringBuilder();
 +    for (int i = 0; i < columns.length; i++) {
 +      sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i])), Constants.UTF8));
 +      sb.append('\n');
 +    }
 +    return sb.toString();
 +  }
 +  
 +  /**
-    * @param flags
 +   * @return encoded flags
 +   */
 +  protected static String encodeBooleans(boolean[] flags) {
 +    byte[] bytes = new byte[flags.length];
 +    for (int i = 0; i < flags.length; i++) {
 +      if (flags[i])
 +        bytes[i] = 1;
 +      else
 +        bytes[i] = 0;
 +    }
 +    return new String(Base64.encodeBase64(bytes), Constants.UTF8);
 +  }
 +  
 +  protected static Text[] decodeColumns(String columns) {
 +    String[] columnStrings = columns.split("\n");
 +    Text[] columnTexts = new Text[columnStrings.length];
 +    for (int i = 0; i < columnStrings.length; i++) {
 +      columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes(Constants.UTF8)));
 +    }
 +    return columnTexts;
 +  }
 +  
 +  /**
-    * @param flags
 +   * @return decoded flags
 +   */
 +  protected static boolean[] decodeBooleans(String flags) {
 +    // return null of there were no flags
 +    if (flags == null)
 +      return null;
 +    
 +    byte[] bytes = Base64.decodeBase64(flags.getBytes(Constants.UTF8));
 +    boolean[] bFlags = new boolean[bytes.length];
 +    for (int i = 0; i < bytes.length; i++) {
 +      if (bytes[i] == 1)
 +        bFlags[i] = true;
 +      else
 +        bFlags[i] = false;
 +    }
 +    return bFlags;
 +  }
 +  
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    Text[] terms = decodeColumns(options.get(columnFamiliesOptionName));
 +    boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName));
 +    
 +    if (terms.length < 2) {
 +      throw new IllegalArgumentException("IntersectionIterator requires two or more columns families");
 +    }
 +    
 +    // Scan the not flags.
 +    // There must be at least one term that isn't negated
 +    // And we are going to re-order such that the first term is not a ! term
 +    if (notFlag == null) {
 +      notFlag = new boolean[terms.length];
 +      for (int i = 0; i < terms.length; i++)
 +        notFlag[i] = false;
 +    }
 +    if (notFlag[0]) {
 +      for (int i = 1; i < notFlag.length; i++) {
 +        if (notFlag[i] == false) {
 +          Text swapFamily = new Text(terms[0]);
 +          terms[0].set(terms[i]);
 +          terms[i].set(swapFamily);
 +          notFlag[0] = false;
 +          notFlag[i] = true;
 +          break;
 +        }
 +      }
 +      if (notFlag[0]) {
 +        throw new IllegalArgumentException("IntersectionIterator requires at lest one column family without not");
 +      }
 +    }
 +    
 +    sources = new TermSource[terms.length];
 +    sources[0] = new TermSource(source, terms[0]);
 +    for (int i = 1; i < terms.length; i++) {
 +      sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]);
 +    }
 +    sourcesCount = terms.length;
 +  }
 +  
 +  @Override
 +  public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
 +    overallRange = new Range(range);
 +    currentPartition = new Text();
 +    currentDocID.set(emptyByteArray);
 +    
 +    // seek each of the sources to the right column family within the row given by key
 +    for (int i = 0; i < sourcesCount; i++) {
 +      Key sourceKey;
 +      if (range.getStartKey() != null) {
 +        if (range.getStartKey().getColumnQualifier() != null) {
 +          sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term, range.getStartKey().getColumnQualifier());
 +        } else {
 +          sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term);
 +        }
 +        // Seek only to the term for this source as a column family
 +        sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true);
 +      } else {
 +        // Seek only to the term for this source as a column family
 +        sources[i].iter.seek(range, sources[i].seekColfams, true);
 +      }
 +    }
 +    advanceToIntersection();
 +  }
 +  
 +  public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
 +    // Check if we have space for the added Source
 +    if (sources == null) {
 +      sources = new TermSource[1];
 +    } else {
 +      // allocate space for node, and copy current tree.
 +      // TODO: Should we change this to an ArrayList so that we can just add() ? - ACCUMULO-1309
 +      TermSource[] localSources = new TermSource[sources.length + 1];
 +      int currSource = 0;
 +      for (TermSource myTerm : sources) {
 +        // TODO: Do I need to call new here? or can I just re-use the term? - ACCUMULO-1309
 +        localSources[currSource] = new TermSource(myTerm);
 +        currSource++;
 +      }
 +      sources = localSources;
 +    }
 +    sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag);
 +    sourcesCount++;
 +  }
 +  
 +  /**
 +   * Encode the columns to be used when iterating.
-    * 
-    * @param cfg
-    * @param columns
 +   */
 +  public static void setColumnFamilies(IteratorSetting cfg, Text[] columns) {
 +    if (columns.length < 2)
 +      throw new IllegalArgumentException("Must supply at least two terms to intersect");
 +    cfg.addOption(IntersectingIterator.columnFamiliesOptionName, IntersectingIterator.encodeColumns(columns));
 +  }
 +  
 +  /**
 +   * Encode columns and NOT flags indicating which columns should be negated (docIDs will be excluded if matching negated columns, instead of included).
-    * 
-    * @param cfg
-    * @param columns
-    * @param notFlags
 +   */
 +  public static void setColumnFamilies(IteratorSetting cfg, Text[] columns, boolean[] notFlags) {
 +    if (columns.length < 2)
 +      throw new IllegalArgumentException("Must supply at least two terms to intersect");
 +    if (columns.length != notFlags.length)
 +      throw new IllegalArgumentException("columns and notFlags arrays must be the same length");
 +    setColumnFamilies(cfg, columns);
 +    cfg.addOption(IntersectingIterator.notFlagOptionName, IntersectingIterator.encodeBooleans(notFlags));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
index a232796,0000000..2d2fa74
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
@@@ -1,165 -1,0 +1,164 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.iterators.user;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.apache.hadoop.io.Text;
 +
 +/**
 + * This iterator makes it easy to select rows that meet a given criteria. Its an alternative to the {@link WholeRowIterator}. There are a few things to consider
 + * when deciding which one to use.
 + * 
 + * First the WholeRowIterator requires that the row fit in memory and that the entire row is read before a decision is made. This iterator has neither
 + * requirement, it allows seeking within a row to avoid reading the entire row to make a decision. So even if your rows fit into memory, this extending this
 + * iterator may be better choice because you can seek.
 + * 
 + * Second the WholeRowIterator is currently the only way to achieve row isolation with the {@link BatchScanner}. With the normal {@link Scanner} row isolation
 + * can be enabled and this Iterator may be used.
 + * 
 + * Third the row acceptance test will be executed every time this Iterator is seeked. If the row is large, then the row will fetched in batches of key/values.
 + * As each batch is fetched the test may be re-executed because the iterator stack is reseeked for each batch. The batch size may be increased to reduce the
 + * number of times the test is executed. With the normal Scanner, if isolation is enabled then it will read an entire row w/o seeking this iterator.
 + * 
 + */
 +public abstract class RowFilter extends WrappingIterator {
 +  
 +  private RowIterator decisionIterator;
 +  private Collection<ByteSequence> columnFamilies;
 +  Text currentRow;
 +  private boolean inclusive;
 +  private Range range;
 +  private boolean hasTop;
 +
 +  private static class RowIterator extends WrappingIterator {
 +    private Range rowRange;
 +    private boolean hasTop;
 +    
 +    RowIterator(SortedKeyValueIterator<Key,Value> source) {
 +      super.setSource(source);
 +    }
 +    
 +    void setRow(Range row) {
 +      this.rowRange = row;
 +    }
 +    
 +    @Override
 +    public boolean hasTop() {
 +      return hasTop && super.hasTop();
 +    }
 +    
 +    @Override
 +    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
 +      
 +      range = rowRange.clip(range, true);
 +      if (range == null) {
 +        hasTop = false;
 +      } else {
 +        hasTop = true;
 +        super.seek(range, columnFamilies, inclusive);
 +      }
 +    }
 +  }
 +
 +  private void skipRows() throws IOException {
 +    SortedKeyValueIterator<Key,Value> source = getSource();
 +    while (source.hasTop()) {
 +      Text row = source.getTopKey().getRow();
 +      
 +      if (currentRow != null && currentRow.equals(row))
 +        break;
 +      
 +      Range rowRange = new Range(row);
 +      decisionIterator.setRow(rowRange);
 +      decisionIterator.seek(rowRange, columnFamilies, inclusive);
 +      
 +      if (acceptRow(decisionIterator)) {
 +        currentRow = row;
 +        break;
 +      } else {
 +        currentRow = null;
 +        int count = 0;
 +        while (source.hasTop() && count < 10 && source.getTopKey().getRow().equals(row)) {
 +          count++;
 +          source.next();
 +        }
 +        
 +        if (source.hasTop() && source.getTopKey().getRow().equals(row)) {
 +          Range nextRow = new Range(row, false, null, false);
 +          nextRow = range.clip(nextRow, true);
 +          if (nextRow == null)
 +            hasTop = false;
 +          else
 +            source.seek(nextRow, columnFamilies, inclusive);
 +        }
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Implementation should return false to suppress a row.
 +   * 
 +   * 
 +   * @param rowIterator
 +   *          - An iterator over the row. This iterator is confined to the row. Seeking past the end of the row will return no data. Seeking before the row will
 +   *          always set top to the first column in the current row. By default this iterator will only see the columns the parent was seeked with. To see more
 +   *          columns reseek this iterator with those columns.
 +   * @return false if a row should be suppressed, otherwise true.
-    * @throws IOException
 +   */
 +  public abstract boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator) throws IOException;
 +
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    this.decisionIterator = new RowIterator(source.deepCopy(env));
 +  }
 +  
 +  @Override
 +  public boolean hasTop() {
 +    return hasTop && super.hasTop();
 +  }
 +  
 +  @Override
 +  public void next() throws IOException {
 +    super.next();
 +    skipRows();
 +  }
 +  
 +  @Override
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
 +    super.seek(range, columnFamilies, inclusive);
 +    this.columnFamilies = columnFamilies;
 +    this.inclusive = inclusive;
 +    this.range = range;
 +    currentRow = null;
 +    hasTop = true;
 +    skipRows();
 +    
 +  }
 +}