You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:57:42 UTC

[11/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/data/Range.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/data/Range.java
index 65873c3,0000000..122436b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Range.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Range.java
@@@ -1,906 -1,0 +1,899 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.data;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
- import java.io.InvalidObjectException;
 +import java.io.IOException;
++import java.io.InvalidObjectException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import org.apache.accumulo.core.data.thrift.TRange;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.WritableComparable;
 +
 +/**
 + * This class is used to specify a range of Accumulo Keys.
 + * 
 + */
 +
 +public class Range implements WritableComparable<Range> {
 +  
 +  private Key start;
 +  private Key stop;
 +  private boolean startKeyInclusive;
 +  private boolean stopKeyInclusive;
 +  private boolean infiniteStartKey;
 +  private boolean infiniteStopKey;
 +  
 +  /**
 +   * Creates a range that goes from negative to positive infinity
 +   */
 +  
 +  public Range() {
 +    this((Key) null, true, (Key) null, true);
 +  }
 +  
 +  /**
 +   * Creates a range from startKey inclusive to endKey inclusive
 +   * 
 +   * @param startKey
 +   *          set this to null when negative infinity is needed
 +   * @param endKey
 +   *          set this to null when positive infinity is needed
 +   */
 +  public Range(Key startKey, Key endKey) {
 +    this(startKey, true, endKey, true);
 +  }
 +  
 +  /**
 +   * Creates a range that covers an entire row
 +   * 
 +   * @param row
 +   *          set this to null to cover all rows
 +   */
 +  public Range(CharSequence row) {
 +    this(row, true, row, true);
 +  }
 +  
 +  /**
 +   * Creates a range that covers an entire row
 +   * 
 +   * @param row
 +   *          set this to null to cover all rows
 +   */
 +  public Range(Text row) {
 +    this(row, true, row, true);
 +  }
 +  
 +  /**
 +   * Creates a range from startRow inclusive to endRow inclusive
 +   * 
 +   * @param startRow
 +   *          set this to null when negative infinity is needed
 +   * @param endRow
 +   *          set this to null when positive infinity is needed
 +   */
 +  public Range(Text startRow, Text endRow) {
 +    this(startRow, true, endRow, true);
 +  }
 +  
 +  /**
 +   * Creates a range from startRow inclusive to endRow inclusive
 +   * 
 +   * @param startRow
 +   *          set this to null when negative infinity is needed
 +   * @param endRow
 +   *          set this to null when positive infinity is needed
 +   */
 +  public Range(CharSequence startRow, CharSequence endRow) {
 +    this(startRow, true, endRow, true);
 +  }
 +  
 +  /**
 +   * Creates a range from startRow to endRow
 +   * 
 +   * @param startRow
 +   *          set this to null when negative infinity is needed
 +   * @param startRowInclusive
 +   *          determines if the start row is skipped
 +   * @param endRow
 +   *          set this to null when positive infinity is needed
 +   * @param endRowInclusive
 +   *          determines if the endRow is included
 +   */
 +  
 +  public Range(Text startRow, boolean startRowInclusive, Text endRow, boolean endRowInclusive) {
 +    this((startRow == null ? null : (startRowInclusive ? new Key(startRow) : new Key(startRow).followingKey(PartialKey.ROW))), true, (endRow == null ? null
 +        : (endRowInclusive ? new Key(endRow).followingKey(PartialKey.ROW) : new Key(endRow))), false);
 +  }
 +  
 +  /**
 +   * Creates a range from startRow to endRow
 +   * 
 +   * @param startRow
 +   *          set this to null when negative infinity is needed
 +   * @param startRowInclusive
 +   *          determines if the start row is skipped
 +   * @param endRow
 +   *          set this to null when positive infinity is needed
 +   * @param endRowInclusive
 +   *          determines if the endRow is included
 +   */
 +  
 +  public Range(CharSequence startRow, boolean startRowInclusive, CharSequence endRow, boolean endRowInclusive) {
 +    this(startRow == null ? null : new Text(startRow.toString()), startRowInclusive, endRow == null ? null : new Text(endRow.toString()), endRowInclusive);
 +  }
 +  
 +  /**
 +   * Creates a range from startKey to endKey
 +   * 
 +   * @param startKey
 +   *          set this to null when negative infinity is needed
 +   * @param startKeyInclusive
 +   *          determines if the ranges includes the start key
 +   * @param endKey
 +   *          set this to null when infinity is needed
 +   * @param endKeyInclusive
 +   *          determines if the range includes the end key
 +   */
 +  public Range(Key startKey, boolean startKeyInclusive, Key endKey, boolean endKeyInclusive) {
 +    this.start = startKey;
 +    this.startKeyInclusive = startKeyInclusive;
 +    this.infiniteStartKey = startKey == null;
 +    this.stop = endKey;
 +    this.stopKeyInclusive = endKeyInclusive;
 +    this.infiniteStopKey = stop == null;
 +    
 +    if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(endKey)) {
 +      throw new IllegalArgumentException("Start key must be less than end key in range (" + startKey + ", " + endKey + ")");
 +    }
 +  }
 +  
 +  /**
 +   * Copies a range
 +   */
 +  public Range(Range range) {
 +    this(range.start, range.startKeyInclusive, range.infiniteStartKey, range.stop, range.stopKeyInclusive, range.infiniteStopKey);
 +  }
 +  
 +  /**
 +   * Creates a range from start to stop.
 +   *
 +   * @param start
 +   *          set this to null when negative infinity is needed
 +   * @param stop
 +   *          set this to null when infinity is needed
 +   * @param startKeyInclusive
 +   *          determines if the ranges includes the start key
 +   * @param stopKeyInclusive
 +   *          determines if the range includes the end key
 +   * @param infiniteStartKey
 +   *          true if start key is negative infinity (null)
 +   * @param infiniteStopKey
 +   *          true if stop key is positive infinity (null)
 +   * @throws IllegalArgumentException if stop is before start, or infiniteStartKey is true but start is not null, or infiniteStopKey is true but stop is not
 +   *          null
 +   */
 +  public Range(Key start, Key stop, boolean startKeyInclusive, boolean stopKeyInclusive, boolean infiniteStartKey, boolean infiniteStopKey) {
 +    this(start, startKeyInclusive, infiniteStartKey, stop, stopKeyInclusive, infiniteStopKey);
 +    if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) {
 +      throw new IllegalArgumentException("Start key must be less than end key in range (" + start + ", " + stop + ")");
 +    }
 +  }
 +
 +  /**
 +   * Creates a range from start to stop. Unlike the public six-argument method,
 +   * this one does not assure that stop is after start, which helps performance
 +   * in cases where that assurance is already in place.
 +   *
 +   * @param start
 +   *          set this to null when negative infinity is needed
 +   * @param startKeyInclusive
 +   *          determines if the ranges includes the start key
 +   * @param infiniteStartKey
 +   *          true if start key is negative infinity (null)
 +   * @param stop
 +   *          set this to null when infinity is needed
 +   * @param stopKeyInclusive
 +   *          determines if the range includes the end key
 +   * @param infiniteStopKey
 +   *          true if stop key is positive infinity (null)
 +   * @throws IllegalArgumentException if infiniteStartKey is true but start is not null, or infiniteStopKey is true but stop is not null
 +   */
 +  protected Range(Key start, boolean startKeyInclusive, boolean infiniteStartKey, Key stop, boolean stopKeyInclusive, boolean infiniteStopKey) {
 +    if (infiniteStartKey && start != null)
 +      throw new IllegalArgumentException();
 +    
 +    if (infiniteStopKey && stop != null)
 +      throw new IllegalArgumentException();
 +    
 +    this.start = start;
 +    this.stop = stop;
 +    this.startKeyInclusive = startKeyInclusive;
 +    this.stopKeyInclusive = stopKeyInclusive;
 +    this.infiniteStartKey = infiniteStartKey;
 +    this.infiniteStopKey = infiniteStopKey;
 +  }
 +  
 +  public Range(TRange trange) {
 +    this(trange.start == null ? null : new Key(trange.start), trange.startKeyInclusive, trange.infiniteStartKey,
 +        trange.stop == null ? null : new Key(trange.stop), trange.stopKeyInclusive, trange.infiniteStopKey);
 +    if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) {
 +      throw new IllegalArgumentException("Start key must be less than end key in range (" + start + ", " + stop + ")");
 +    }
 +  }
 +  
 +  /**
 +   * @return the first key in the range, null if the key is infinite
 +   */
 +  public Key getStartKey() {
 +    if (infiniteStartKey) {
 +      return null;
 +    }
 +    return start;
 +  }
 +  
 +  /**
 +   * 
-    * @param key
 +   * @return true if the given key is before the range, otherwise false
 +   */
 +  public boolean beforeStartKey(Key key) {
 +    if (infiniteStartKey) {
 +      return false;
 +    }
 +    
 +    if (startKeyInclusive)
 +      return key.compareTo(start) < 0;
 +    return key.compareTo(start) <= 0;
 +  }
 +  
 +  /**
 +   * @return the last key in the range, null if the end key is infinite
 +   */
 +  
 +  public Key getEndKey() {
 +    if (infiniteStopKey) {
 +      return null;
 +    }
 +    return stop;
 +  }
 +  
 +  /**
-    * @param key
 +   * @return true if the given key is after the range, otherwise false
 +   */
 +  
 +  public boolean afterEndKey(Key key) {
 +    if (infiniteStopKey)
 +      return false;
 +    
 +    if (stopKeyInclusive)
 +      return stop.compareTo(key) < 0;
 +    return stop.compareTo(key) <= 0;
 +  }
 +  
 +  @Override
 +  public int hashCode() {
 +    int startHash = infiniteStartKey ? 0 : start.hashCode() + (startKeyInclusive ? 1 : 0);
 +    int stopHash = infiniteStopKey ? 0 : stop.hashCode() + (stopKeyInclusive ? 1 : 0);
 +    
 +    return startHash + stopHash;
 +  }
 +  
 +  @Override
 +  public boolean equals(Object o) {
 +    if (o instanceof Range)
 +      return equals((Range) o);
 +    return false;
 +  }
 +  
 +  public boolean equals(Range otherRange) {
 +    
 +    return compareTo(otherRange) == 0;
 +  }
 +  
 +  /**
 +   * Compares this range to another range. Compares in the order start key, inclusiveness of start key, end key, inclusiveness of end key. Infinite keys sort
 +   * first, and non-infinite keys are compared with {@link Key#compareTo(Key)}. Inclusive sorts before non-inclusive.
 +   */
++  @Override
 +  public int compareTo(Range o) {
 +    int comp;
 +    
 +    if (infiniteStartKey)
 +      if (o.infiniteStartKey)
 +        comp = 0;
 +      else
 +        comp = -1;
 +    else if (o.infiniteStartKey)
 +      comp = 1;
 +    else {
 +      comp = start.compareTo(o.start);
 +      if (comp == 0)
 +        if (startKeyInclusive && !o.startKeyInclusive)
 +          comp = -1;
 +        else if (!startKeyInclusive && o.startKeyInclusive)
 +          comp = 1;
 +      
 +    }
 +    
 +    if (comp == 0)
 +      if (infiniteStopKey)
 +        if (o.infiniteStopKey)
 +          comp = 0;
 +        else
 +          comp = 1;
 +      else if (o.infiniteStopKey)
 +        comp = -1;
 +      else {
 +        comp = stop.compareTo(o.stop);
 +        if (comp == 0)
 +          if (stopKeyInclusive && !o.stopKeyInclusive)
 +            comp = 1;
 +          else if (!stopKeyInclusive && o.stopKeyInclusive)
 +            comp = -1;
 +      }
 +    
 +    return comp;
 +  }
 +  
 +  /**
 +   * 
-    * @param key
 +   * @return true if the given key falls within the range
 +   */
 +  public boolean contains(Key key) {
 +    return !beforeStartKey(key) && !afterEndKey(key);
 +  }
 +  
 +  /**
 +   * Takes a collection on range and merges overlapping and adjacent ranges. For example given the following input
 +   * 
 +   * <pre>
 +   * [a,c], (c, d], (g,m), (j,t]
 +   * </pre>
 +   * 
 +   * the following ranges would be returned
 +   * 
 +   * <pre>
 +   * [a,d], (g,t]
 +   * </pre>
 +   * 
-    * @param ranges
 +   * @return list of merged ranges
 +   */
 +  
 +  public static List<Range> mergeOverlapping(Collection<Range> ranges) {
 +    if (ranges.size() == 0)
 +      return Collections.emptyList();
 +    
 +    List<Range> ral = new ArrayList<Range>(ranges);
 +    Collections.sort(ral);
 +    
 +    ArrayList<Range> ret = new ArrayList<Range>(ranges.size());
 +    
 +    Range currentRange = ral.get(0);
 +    boolean currentStartKeyInclusive = ral.get(0).startKeyInclusive;
 +    
 +    for (int i = 1; i < ral.size(); i++) {
 +      // because of inclusive switch, equal keys may not be seen
 +      
 +      if (currentRange.infiniteStopKey) {
 +        // this range has the minimal start key and
 +        // an infinite end key so it will contain all
 +        // other ranges
 +        break;
 +      }
 +      
 +      Range range = ral.get(i);
 +      
 +      boolean startKeysEqual;
 +      if (range.infiniteStartKey) {
 +        // previous start key must be infinite because it is sorted
 +        assert (currentRange.infiniteStartKey);
 +        startKeysEqual = true;
 +      } else if (currentRange.infiniteStartKey) {
 +        startKeysEqual = false;
 +      } else if (currentRange.start.equals(range.start)) {
 +        startKeysEqual = true;
 +      } else {
 +        startKeysEqual = false;
 +      }
 +      
 +      if (startKeysEqual || currentRange.contains(range.start)
 +          || (!currentRange.stopKeyInclusive && range.startKeyInclusive && range.start.equals(currentRange.stop))) {
 +        int cmp;
 +        
 +        if (range.infiniteStopKey || (cmp = range.stop.compareTo(currentRange.stop)) > 0 || (cmp == 0 && range.stopKeyInclusive)) {
 +          currentRange = new Range(currentRange.getStartKey(), currentStartKeyInclusive, range.getEndKey(), range.stopKeyInclusive);
 +        }/* else currentRange contains ral.get(i) */
 +      } else {
 +        ret.add(currentRange);
 +        currentRange = range;
 +        currentStartKeyInclusive = range.startKeyInclusive;
 +      }
 +    }
 +    
 +    ret.add(currentRange);
 +    
 +    return ret;
 +  }
 +  
 +  /**
 +   * Creates a range which represents the intersection of this range and the passed in range. The following example will print true.
 +   * 
 +   * <pre>
 +   * Range range1 = new Range(&quot;a&quot;, &quot;f&quot;);
 +   * Range range2 = new Range(&quot;c&quot;, &quot;n&quot;);
 +   * Range range3 = range1.clip(range2);
 +   * System.out.println(range3.equals(new Range(&quot;c&quot;, &quot;f&quot;)));
 +   * </pre>
 +   * 
-    * @param range
 +   * @return the intersection
 +   * @throws IllegalArgumentException
 +   *           if ranges does not overlap
 +   */
 +  
 +  public Range clip(Range range) {
 +    return clip(range, false);
 +  }
 +  
 +  /**
 +   * Same as other clip function except if gives the option to return null of the ranges do not overlap instead of throwing an exception.
 +   * 
 +   * @see Range#clip(Range)
-    * @param range
 +   * @param returnNullIfDisjoint
 +   *          If the ranges do not overlap and true is passed, then null is returned otherwise an exception is thrown.
 +   * @return the intersection
 +   */
 +  
 +  public Range clip(Range range, boolean returnNullIfDisjoint) {
 +    
 +    Key sk = range.getStartKey();
 +    boolean ski = range.isStartKeyInclusive();
 +    
 +    Key ek = range.getEndKey();
 +    boolean eki = range.isEndKeyInclusive();
 +    
 +    if (range.getStartKey() == null) {
 +      if (getStartKey() != null) {
 +        sk = getStartKey();
 +        ski = isStartKeyInclusive();
 +      }
 +    } else if (afterEndKey(range.getStartKey())
 +        || (getEndKey() != null && range.getStartKey().equals(getEndKey()) && !(range.isStartKeyInclusive() && isEndKeyInclusive()))) {
 +      if (returnNullIfDisjoint)
 +        return null;
 +      throw new IllegalArgumentException("Range " + range + " does not overlap " + this);
 +    } else if (beforeStartKey(range.getStartKey())) {
 +      sk = getStartKey();
 +      ski = isStartKeyInclusive();
 +    }
 +    
 +    if (range.getEndKey() == null) {
 +      if (getEndKey() != null) {
 +        ek = getEndKey();
 +        eki = isEndKeyInclusive();
 +      }
 +    } else if (beforeStartKey(range.getEndKey())
 +        || (getStartKey() != null && range.getEndKey().equals(getStartKey()) && !(range.isEndKeyInclusive() && isStartKeyInclusive()))) {
 +      if (returnNullIfDisjoint)
 +        return null;
 +      throw new IllegalArgumentException("Range " + range + " does not overlap " + this);
 +    } else if (afterEndKey(range.getEndKey())) {
 +      ek = getEndKey();
 +      eki = isEndKeyInclusive();
 +    }
 +    
 +    return new Range(sk, ski, ek, eki);
 +  }
 +  
 +  /**
 +   * Creates a new range that is bounded by the columns passed in. The stary key in the returned range will have a column >= to the minimum column. The end key
 +   * in the returned range will have a column <= the max column.
 +   * 
-    * 
-    * @param min
-    * @param max
 +   * @return a column bounded range
 +   * @throws IllegalArgumentException
 +   *           if min > max
 +   */
 +  
 +  public Range bound(Column min, Column max) {
 +    
 +    if (min.compareTo(max) > 0) {
 +      throw new IllegalArgumentException("min column > max column " + min + " " + max);
 +    }
 +    
 +    Key sk = getStartKey();
 +    boolean ski = isStartKeyInclusive();
 +    
 +    if (sk != null) {
 +      
 +      ByteSequence cf = sk.getColumnFamilyData();
 +      ByteSequence cq = sk.getColumnQualifierData();
 +      
 +      ByteSequence mincf = new ArrayByteSequence(min.columnFamily);
 +      ByteSequence mincq;
 +      
 +      if (min.columnQualifier != null)
 +        mincq = new ArrayByteSequence(min.columnQualifier);
 +      else
 +        mincq = new ArrayByteSequence(new byte[0]);
 +      
 +      int cmp = cf.compareTo(mincf);
 +      
 +      if (cmp < 0 || (cmp == 0 && cq.compareTo(mincq) < 0)) {
 +        ski = true;
 +        sk = new Key(sk.getRowData().toArray(), mincf.toArray(), mincq.toArray(), new byte[0], Long.MAX_VALUE, true);
 +      }
 +    }
 +    
 +    Key ek = getEndKey();
 +    boolean eki = isEndKeyInclusive();
 +    
 +    if (ek != null) {
 +      ByteSequence row = ek.getRowData();
 +      ByteSequence cf = ek.getColumnFamilyData();
 +      ByteSequence cq = ek.getColumnQualifierData();
 +      ByteSequence cv = ek.getColumnVisibilityData();
 +      
 +      ByteSequence maxcf = new ArrayByteSequence(max.columnFamily);
 +      ByteSequence maxcq = null;
 +      if (max.columnQualifier != null)
 +        maxcq = new ArrayByteSequence(max.columnQualifier);
 +      
 +      boolean set = false;
 +      
 +      int comp = cf.compareTo(maxcf);
 +      
 +      if (comp > 0) {
 +        set = true;
 +      } else if (comp == 0 && maxcq != null && cq.compareTo(maxcq) > 0) {
 +        set = true;
 +      } else if (!eki && row.length() > 0 && row.byteAt(row.length() - 1) == 0 && cf.length() == 0 && cq.length() == 0 && cv.length() == 0
 +          && ek.getTimestamp() == Long.MAX_VALUE) {
 +        row = row.subSequence(0, row.length() - 1);
 +        set = true;
 +      }
 +      
 +      if (set) {
 +        eki = false;
 +        if (maxcq == null)
 +          ek = new Key(row.toArray(), maxcf.toArray(), new byte[0], new byte[0], 0, false).followingKey(PartialKey.ROW_COLFAM);
 +        else
 +          ek = new Key(row.toArray(), maxcf.toArray(), maxcq.toArray(), new byte[0], 0, false).followingKey(PartialKey.ROW_COLFAM_COLQUAL);
 +      }
 +    }
 +    
 +    return new Range(sk, ski, ek, eki);
 +  }
 +  
++  @Override
 +  public String toString() {
 +    return ((startKeyInclusive && start != null) ? "[" : "(") + (start == null ? "-inf" : start) + "," + (stop == null ? "+inf" : stop)
 +        + ((stopKeyInclusive && stop != null) ? "]" : ")");
 +  }
 +  
++  @Override
 +  public void readFields(DataInput in) throws IOException {
 +    infiniteStartKey = in.readBoolean();
 +    infiniteStopKey = in.readBoolean();
 +    if (!infiniteStartKey) {
 +      start = new Key();
 +      start.readFields(in);
 +    } else {
 +      start = null;
 +    }
 +    
 +    if (!infiniteStopKey) {
 +      stop = new Key();
 +      stop.readFields(in);
 +    } else {
 +      stop = null;
 +    }
 +    
 +    startKeyInclusive = in.readBoolean();
 +    stopKeyInclusive = in.readBoolean();
 +
 +    if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) {
 +      throw new InvalidObjectException("Start key must be less than end key in range (" + start + ", " + stop + ")");
 +    }
 +  }
 +  
++  @Override
 +  public void write(DataOutput out) throws IOException {
 +    out.writeBoolean(infiniteStartKey);
 +    out.writeBoolean(infiniteStopKey);
 +    if (!infiniteStartKey)
 +      start.write(out);
 +    if (!infiniteStopKey)
 +      stop.write(out);
 +    out.writeBoolean(startKeyInclusive);
 +    out.writeBoolean(stopKeyInclusive);
 +  }
 +  
 +  public boolean isStartKeyInclusive() {
 +    return startKeyInclusive;
 +  }
 +  
 +  public boolean isEndKeyInclusive() {
 +    return stopKeyInclusive;
 +  }
 +  
 +  public TRange toThrift() {
 +    return new TRange(start == null ? null : start.toThrift(), stop == null ? null : stop.toThrift(), startKeyInclusive, stopKeyInclusive, infiniteStartKey,
 +        infiniteStopKey);
 +  }
 +  
 +  public boolean isInfiniteStartKey() {
 +    return infiniteStartKey;
 +  }
 +  
 +  public boolean isInfiniteStopKey() {
 +    return infiniteStopKey;
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row Returns the same Range as new Range(row)
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   */
 +  public static Range exact(Text row) {
 +    return new Range(row);
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row and column family
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   * 
 +   * @param cf
 +   *          all keys in the range will have this column family
 +   */
 +  public static Range exact(Text row, Text cf) {
 +    Key startKey = new Key(row, cf);
 +    return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false);
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row, column family, and column qualifier
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   * 
 +   * @param cf
 +   *          all keys in the range will have this column family
 +   * 
 +   * @param cq
 +   *          all keys in the range will have this column qualifier
 +   */
 +  public static Range exact(Text row, Text cf, Text cq) {
 +    Key startKey = new Key(row, cf, cq);
 +    return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), false);
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row, column family, column qualifier, and visibility
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   * 
 +   * @param cf
 +   *          all keys in the range will have this column family
 +   * 
 +   * @param cq
 +   *          all keys in the range will have this column qualifier
 +   * 
 +   * @param cv
 +   *          all keys in the range will have this column visibility
 +   */
 +  public static Range exact(Text row, Text cf, Text cq, Text cv) {
 +    Key startKey = new Key(row, cf, cq, cv);
 +    return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS), false);
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row, column family, column qualifier, visibility, and timestamp
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   * 
 +   * @param cf
 +   *          all keys in the range will have this column family
 +   * 
 +   * @param cq
 +   *          all keys in the range will have this column qualifier
 +   * 
 +   * @param cv
 +   *          all keys in the range will have this column visibility
 +   * 
 +   * @param ts
 +   *          all keys in the range will have this timestamp
 +   */
 +  public static Range exact(Text row, Text cf, Text cq, Text cv, long ts) {
 +    Key startKey = new Key(row, cf, cq, cv, ts);
 +    return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME), false);
 +  }
 +  
 +  /**
 +   * Returns a Text that sorts just after all Texts beginning with a prefix
-    * 
-    * @param prefix
 +   */
 +  public static Text followingPrefix(Text prefix) {
 +    byte[] prefixBytes = prefix.getBytes();
 +    
 +    // find the last byte in the array that is not 0xff
 +    int changeIndex = prefix.getLength() - 1;
 +    while (changeIndex >= 0 && prefixBytes[changeIndex] == (byte) 0xff)
 +      changeIndex--;
 +    if (changeIndex < 0)
 +      return null;
 +    
 +    // copy prefix bytes into new array
 +    byte[] newBytes = new byte[changeIndex + 1];
 +    System.arraycopy(prefixBytes, 0, newBytes, 0, changeIndex + 1);
 +    
 +    // increment the selected byte
 +    newBytes[changeIndex]++;
 +    return new Text(newBytes);
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all rows beginning with a prefix
 +   * 
 +   * @param rowPrefix
 +   *          all keys in the range will have rows that begin with this prefix
 +   */
 +  public static Range prefix(Text rowPrefix) {
 +    Text fp = followingPrefix(rowPrefix);
 +    return new Range(new Key(rowPrefix), true, fp == null ? null : new Key(fp), false);
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all column families beginning with a prefix within a given row
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   * 
 +   * @param cfPrefix
 +   *          all keys in the range will have column families that begin with this prefix
 +   */
 +  public static Range prefix(Text row, Text cfPrefix) {
 +    Text fp = followingPrefix(cfPrefix);
 +    return new Range(new Key(row, cfPrefix), true, fp == null ? new Key(row).followingKey(PartialKey.ROW) : new Key(row, fp), false);
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all column qualifiers beginning with a prefix within a given row and column family
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   * 
 +   * @param cf
 +   *          all keys in the range will have this column family
 +   * 
 +   * @param cqPrefix
 +   *          all keys in the range will have column qualifiers that begin with this prefix
 +   */
 +  public static Range prefix(Text row, Text cf, Text cqPrefix) {
 +    Text fp = followingPrefix(cqPrefix);
 +    return new Range(new Key(row, cf, cqPrefix), true, fp == null ? new Key(row, cf).followingKey(PartialKey.ROW_COLFAM) : new Key(row, cf, fp), false);
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all column visibilities beginning with a prefix within a given row, column family, and column qualifier
 +   * 
 +   * @param row
 +   *          all keys in the range will have this row
 +   * 
 +   * @param cf
 +   *          all keys in the range will have this column family
 +   * 
 +   * @param cq
 +   *          all keys in the range will have this column qualifier
 +   * 
 +   * @param cvPrefix
 +   *          all keys in the range will have column visibilities that begin with this prefix
 +   */
 +  public static Range prefix(Text row, Text cf, Text cq, Text cvPrefix) {
 +    Text fp = followingPrefix(cvPrefix);
 +    return new Range(new Key(row, cf, cq, cvPrefix), true, fp == null ? new Key(row, cf, cq).followingKey(PartialKey.ROW_COLFAM_COLQUAL) : new Key(row, cf, cq,
 +        fp), false);
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row
 +   * 
 +   * @see Range#exact(Text)
 +   */
 +  public static Range exact(CharSequence row) {
 +    return Range.exact(new Text(row.toString()));
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row and column family
 +   * 
 +   * @see Range#exact(Text, Text)
 +   */
 +  public static Range exact(CharSequence row, CharSequence cf) {
 +    return Range.exact(new Text(row.toString()), new Text(cf.toString()));
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row, column family, and column qualifier
 +   * 
 +   * @see Range#exact(Text, Text, Text)
 +   */
 +  public static Range exact(CharSequence row, CharSequence cf, CharSequence cq) {
 +    return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()));
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row, column family, column qualifier, and visibility
 +   * 
 +   * @see Range#exact(Text, Text, Text, Text)
 +   */
 +  public static Range exact(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv) {
 +    return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString()));
 +  }
 +  
 +  /**
 +   * Creates a range that covers an exact row, column family, column qualifier, visibility, and timestamp
 +   * 
 +   * @see Range#exact(Text, Text, Text, Text, long)
 +   */
 +  public static Range exact(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv, long ts) {
 +    return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString()), ts);
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all rows beginning with a prefix
 +   * 
 +   * @see Range#prefix(Text)
 +   */
 +  public static Range prefix(CharSequence rowPrefix) {
 +    return Range.prefix(new Text(rowPrefix.toString()));
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all column families beginning with a prefix within a given row
 +   * 
 +   * @see Range#prefix(Text, Text)
 +   */
 +  public static Range prefix(CharSequence row, CharSequence cfPrefix) {
 +    return Range.prefix(new Text(row.toString()), new Text(cfPrefix.toString()));
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all column qualifiers beginning with a prefix within a given row and column family
 +   * 
 +   * @see Range#prefix(Text, Text, Text)
 +   */
 +  public static Range prefix(CharSequence row, CharSequence cf, CharSequence cqPrefix) {
 +    return Range.prefix(new Text(row.toString()), new Text(cf.toString()), new Text(cqPrefix.toString()));
 +  }
 +  
 +  /**
 +   * Returns a Range that covers all column visibilities beginning with a prefix within a given row, column family, and column qualifier
 +   * 
 +   * @see Range#prefix(Text, Text, Text, Text)
 +   */
 +  public static Range prefix(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cvPrefix) {
 +    return Range.prefix(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cvPrefix.toString()));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
index 2b3cdf5,0000000..0ac5308
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
@@@ -1,181 -1,0 +1,176 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.rfile;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.blockfile.ABlockReader;
 +import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 +
 +/**
 + * 
 + */
 +public class BlockIndex {
 +  
 +  public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
 +    
 +    BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
 +    
 +    int accessCount = blockIndex.accessCount.incrementAndGet();
 +    
 +    // 1 is a power of two, but do not care about it
 +    if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
 +      blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
 +    }
 +    
 +    if (blockIndex.blockIndex != null)
 +      return blockIndex;
 +
 +    return null;
 +  }
 +  
 +  private static boolean isPowerOfTwo(int x) {
 +    return ((x > 0) && (x & (x - 1)) == 0);
 +  }
 +  
 +  private AtomicInteger accessCount = new AtomicInteger(0);
 +  private volatile BlockIndexEntry[] blockIndex = null;
 +
 +  public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
 +    
 +    private Key prevKey;
 +    private int entriesLeft;
 +    private int pos;
 +    
 +    public BlockIndexEntry(int pos, int entriesLeft, Key prevKey) {
 +      this.pos = pos;
 +      this.entriesLeft = entriesLeft;
 +      this.prevKey = prevKey;
 +    }
 +
-     /**
-      * @param key
-      */
 +    public BlockIndexEntry(Key key) {
 +      this.prevKey = key;
 +    }
- 
- 
 +    
 +    public int getEntriesLeft() {
 +      return entriesLeft;
 +    }
 +
 +    @Override
 +    public int compareTo(BlockIndexEntry o) {
 +      return prevKey.compareTo(o.prevKey);
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return prevKey + " " + entriesLeft + " " + pos;
 +    }
 +    
 +    public Key getPrevKey() {
 +      return prevKey;
 +    }
 +  }
 +  
 +  public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
 +
 +    // get a local ref to the index, another thread could change it
 +    BlockIndexEntry[] blockIndex = this.blockIndex;
 +    
 +    int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
 +
 +    int index;
 +    
 +    if (pos < 0) {
 +      if (pos == -1)
 +        return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning
 +                     // of block
 +      index = (pos * -1) - 2;
 +    } else {
 +      // found exact key in index
 +      index = pos;
 +      while (index > 0) {
 +        if (blockIndex[index].getPrevKey().equals(startKey))
 +          index--;
 +        else
 +          break;
 +      }
 +    }
 +    
 +    // handle case where multiple keys in block are exactly the same, want to find the earliest key in the index
 +    while (index - 1 > 0) {
 +      if (blockIndex[index].getPrevKey().equals(blockIndex[index - 1].getPrevKey()))
 +        index--;
 +      else
 +        break;
 +
 +    }
 +    
 +    if (index == 0 && blockIndex[index].getPrevKey().equals(startKey))
 +      return null;
 +
 +    BlockIndexEntry bie = blockIndex[index];
 +    cacheBlock.seek(bie.pos);
 +    return bie;
 +  }
 +  
 +  private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
 +    cacheBlock.seek(0);
 +    
 +    RelativeKey rk = new RelativeKey();
 +    Value val = new Value();
 +    
 +    int interval = indexEntry.getNumEntries() / indexEntries;
 +    
 +    if (interval <= 32)
 +      return;
 +    
 +    // multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one
 +    if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
 +      return;
 +
 +    int count = 0;
 +    
 +    ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1);
 +
 +    while (count < (indexEntry.getNumEntries() - interval + 1)) {
 +
 +      Key myPrevKey = rk.getKey();
 +      int pos = cacheBlock.getPosition();
 +      rk.readFields(cacheBlock);
 +      val.readFields(cacheBlock);
 +
 +      if (count > 0 && count % interval == 0) {
 +        index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, myPrevKey));
 +      }
 +      
 +      count++;
 +    }
 +
 +    this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]);
 +
 +    cacheBlock.seek(0);
 +  }
 +  
 +  BlockIndexEntry[] getIndexEntries() {
 +    return blockIndex;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 7277c65,0000000..7d15851
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@@ -1,971 -1,0 +1,965 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * 
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +
 +package org.apache.accumulo.core.file.rfile.bcfile;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.Closeable;
 +import java.io.DataInput;
 +import java.io.DataInputStream;
 +import java.io.DataOutput;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Map;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead;
 +import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.Scalar;
 +import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarComparator;
 +import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarLong;
 +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
 +import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.io.BytesWritable;
 +import org.apache.hadoop.io.compress.Compressor;
 +import org.apache.hadoop.io.compress.Decompressor;
 +
 +/**
 + * Block Compressed file, the underlying physical storage layer for TFile. BCFile provides the basic block level compression for the data block and meta blocks.
 + * It is separated from TFile as it may be used for other block-compressed file implementation.
 + */
 +public final class BCFile {
 +  // the current version of BCFile impl, increment them (major or minor) made
 +  // enough changes
 +  static final Version API_VERSION = new Version((short) 1, (short) 0);
 +  static final Log LOG = LogFactory.getLog(BCFile.class);
 +  
 +  /**
 +   * Prevent the instantiation of BCFile objects.
 +   */
 +  private BCFile() {
 +    // nothing
 +  }
 +  
 +  /**
 +   * BCFile writer, the entry point for creating a new BCFile.
 +   */
 +  static public class Writer implements Closeable {
 +    private final FSDataOutputStream out;
 +    private final Configuration conf;
 +    // the single meta block containing index of compressed data blocks
 +    final DataIndex dataIndex;
 +    // index for meta blocks
 +    final MetaIndex metaIndex;
 +    boolean blkInProgress = false;
 +    private boolean metaBlkSeen = false;
 +    private boolean closed = false;
 +    long errorCount = 0;
 +    // reusable buffers.
 +    private BytesWritable fsOutputBuffer;
 +    
 +    /**
 +     * Call-back interface to register a block after a block is closed.
 +     */
 +    private static interface BlockRegister {
 +      /**
 +       * Register a block that is fully closed.
 +       * 
 +       * @param raw
 +       *          The size of block in terms of uncompressed bytes.
 +       * @param offsetStart
 +       *          The start offset of the block.
 +       * @param offsetEnd
 +       *          One byte after the end of the block. Compressed block size is offsetEnd - offsetStart.
 +       */
 +      public void register(long raw, long offsetStart, long offsetEnd);
 +    }
 +    
 +    /**
 +     * Intermediate class that maintain the state of a Writable Compression Block.
 +     */
 +    private static final class WBlockState {
 +      private final Algorithm compressAlgo;
 +      private Compressor compressor; // !null only if using native
 +      // Hadoop compression
 +      private final FSDataOutputStream fsOut;
 +      private final long posStart;
 +      private final SimpleBufferedOutputStream fsBufferedOutput;
 +      private OutputStream out;
 +      
 +      /**
 +       * @param compressionAlgo
 +       *          The compression algorithm to be used to for compression.
-        * @throws IOException
 +       */
 +      public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
 +        this.compressAlgo = compressionAlgo;
 +        this.fsOut = fsOut;
 +        this.posStart = fsOut.getPos();
 +        
 +        fsOutputBuffer.setCapacity(TFile.getFSOutputBufferSize(conf));
 +        
 +        this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
 +        this.compressor = compressAlgo.getCompressor();
 +        
 +        try {
 +          this.out = compressionAlgo.createCompressionStream(fsBufferedOutput, compressor, 0);
 +        } catch (IOException e) {
 +          compressAlgo.returnCompressor(compressor);
 +          throw e;
 +        }
 +      }
 +      
 +      /**
 +       * Get the output stream for BlockAppender's consumption.
 +       * 
 +       * @return the output stream suitable for writing block data.
 +       */
 +      OutputStream getOutputStream() {
 +        return out;
 +      }
 +      
 +      /**
 +       * Get the current position in file.
 +       * 
 +       * @return The current byte offset in underlying file.
 +       * @throws IOException
 +       */
 +      long getCurrentPos() throws IOException {
 +        return fsOut.getPos() + fsBufferedOutput.size();
 +      }
 +      
 +      long getStartPos() {
 +        return posStart;
 +      }
 +      
 +      /**
 +       * Current size of compressed data.
 +       * 
 +       * @throws IOException
 +       */
 +      long getCompressedSize() throws IOException {
 +        long ret = getCurrentPos() - posStart;
 +        return ret;
 +      }
 +      
 +      /**
 +       * Finishing up the current block.
 +       */
 +      public void finish() throws IOException {
 +        try {
 +          if (out != null) {
 +            out.flush();
 +            out = null;
 +          }
 +        } finally {
 +          compressAlgo.returnCompressor(compressor);
 +          compressor = null;
 +        }
 +      }
 +    }
 +    
 +    /**
 +     * Access point to stuff data into a block.
 +     * 
 +     */
 +    public class BlockAppender extends DataOutputStream {
 +      private final BlockRegister blockRegister;
 +      private final WBlockState wBlkState;
 +      private boolean closed = false;
 +      
 +      /**
 +       * Constructor
 +       * 
 +       * @param register
 +       *          the block register, which is called when the block is closed.
 +       * @param wbs
 +       *          The writable compression block state.
 +       */
 +      BlockAppender(BlockRegister register, WBlockState wbs) {
 +        super(wbs.getOutputStream());
 +        this.blockRegister = register;
 +        this.wBlkState = wbs;
 +      }
 +      
 +      /**
 +       * Get the raw size of the block.
 +       * 
 +       * @return the number of uncompressed bytes written through the BlockAppender so far.
-        * @throws IOException
 +       */
 +      public long getRawSize() throws IOException {
 +        /**
 +         * Expecting the size() of a block not exceeding 4GB. Assuming the size() will wrap to negative integer if it exceeds 2GB.
 +         */
 +        return size() & 0x00000000ffffffffL;
 +      }
 +      
 +      /**
 +       * Get the compressed size of the block in progress.
 +       * 
 +       * @return the number of compressed bytes written to the underlying FS file. The size may be smaller than actual need to compress the all data written due
 +       *         to internal buffering inside the compressor.
-        * @throws IOException
 +       */
 +      public long getCompressedSize() throws IOException {
 +        return wBlkState.getCompressedSize();
 +      }
 +      
 +      public long getStartPos() {
 +        return wBlkState.getStartPos();
 +      }
 +      
 +      @Override
 +      public void flush() {
 +        // The down stream is a special kind of stream that finishes a
 +        // compression block upon flush. So we disable flush() here.
 +      }
 +      
 +      /**
 +       * Signaling the end of write to the block. The block register will be called for registering the finished block.
 +       */
 +      @Override
 +      public void close() throws IOException {
 +        if (closed == true) {
 +          return;
 +        }
 +        try {
 +          ++errorCount;
 +          wBlkState.finish();
 +          blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos());
 +          --errorCount;
 +        } finally {
 +          closed = true;
 +          blkInProgress = false;
 +        }
 +      }
 +    }
 +    
 +    /**
 +     * Constructor
 +     * 
 +     * @param fout
 +     *          FS output stream.
 +     * @param compressionName
 +     *          Name of the compression algorithm, which will be used for all data blocks.
-      * @throws IOException
 +     * @see Compression#getSupportedAlgorithms
 +     */
 +    public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks) throws IOException {
 +      if (fout.getPos() != 0) {
 +        throw new IOException("Output file not at zero offset.");
 +      }
 +      
 +      this.out = fout;
 +      this.conf = conf;
 +      dataIndex = new DataIndex(compressionName, trackDataBlocks);
 +      metaIndex = new MetaIndex();
 +      fsOutputBuffer = new BytesWritable();
 +      Magic.write(fout);
 +    }
 +    
 +    /**
 +     * Close the BCFile Writer. Attempting to use the Writer after calling <code>close</code> is not allowed and may lead to undetermined results.
 +     */
++    @Override
 +    public void close() throws IOException {
 +      if (closed == true) {
 +        return;
 +      }
 +      
 +      try {
 +        if (errorCount == 0) {
 +          if (blkInProgress == true) {
 +            throw new IllegalStateException("Close() called with active block appender.");
 +          }
 +          
 +          // add metaBCFileIndex to metaIndex as the last meta block
 +          BlockAppender appender = prepareMetaBlock(DataIndex.BLOCK_NAME, getDefaultCompressionAlgorithm());
 +          try {
 +            dataIndex.write(appender);
 +          } finally {
 +            appender.close();
 +          }
 +          
 +          long offsetIndexMeta = out.getPos();
 +          metaIndex.write(out);
 +          
 +          // Meta Index and the trailing section are written out directly.
 +          out.writeLong(offsetIndexMeta);
 +          
 +          API_VERSION.write(out);
 +          Magic.write(out);
 +          out.flush();
 +        }
 +      } finally {
 +        closed = true;
 +      }
 +    }
 +    
 +    private Algorithm getDefaultCompressionAlgorithm() {
 +      return dataIndex.getDefaultCompressionAlgorithm();
 +    }
 +    
 +    private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo) throws IOException, MetaBlockAlreadyExists {
 +      if (blkInProgress == true) {
 +        throw new IllegalStateException("Cannot create Meta Block until previous block is closed.");
 +      }
 +      
 +      if (metaIndex.getMetaByName(name) != null) {
 +        throw new MetaBlockAlreadyExists("name=" + name);
 +      }
 +      
 +      MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
 +      WBlockState wbs = new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
 +      BlockAppender ba = new BlockAppender(mbr, wbs);
 +      blkInProgress = true;
 +      metaBlkSeen = true;
 +      return ba;
 +    }
 +    
 +    /**
 +     * Create a Meta Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Regular
 +     * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
 +     * 
 +     * @param name
 +     *          The name of the Meta Block. The name must not conflict with existing Meta Blocks.
 +     * @param compressionName
 +     *          The name of the compression algorithm to be used.
 +     * @return The BlockAppender stream
-      * @throws IOException
 +     * @throws MetaBlockAlreadyExists
 +     *           If the meta block with the name already exists.
 +     */
 +    public BlockAppender prepareMetaBlock(String name, String compressionName) throws IOException, MetaBlockAlreadyExists {
 +      return prepareMetaBlock(name, Compression.getCompressionAlgorithmByName(compressionName));
 +    }
 +    
 +    /**
 +     * Create a Meta Block and obtain an output stream for adding data into the block. The Meta Block will be compressed with the same compression algorithm as
 +     * data blocks. There can only be one BlockAppender stream active at any time. Regular Blocks may not be created after the first Meta Blocks. The caller
 +     * must call BlockAppender.close() to conclude the block creation.
 +     * 
 +     * @param name
 +     *          The name of the Meta Block. The name must not conflict with existing Meta Blocks.
 +     * @return The BlockAppender stream
 +     * @throws MetaBlockAlreadyExists
 +     *           If the meta block with the name already exists.
-      * @throws IOException
 +     */
 +    public BlockAppender prepareMetaBlock(String name) throws IOException, MetaBlockAlreadyExists {
 +      return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
 +    }
 +    
 +    /**
 +     * Create a Data Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Data
 +     * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
 +     * 
 +     * @return The BlockAppender stream
-      * @throws IOException
 +     */
 +    public BlockAppender prepareDataBlock() throws IOException {
 +      if (blkInProgress == true) {
 +        throw new IllegalStateException("Cannot create Data Block until previous block is closed.");
 +      }
 +      
 +      if (metaBlkSeen == true) {
 +        throw new IllegalStateException("Cannot create Data Block after Meta Blocks.");
 +      }
 +      
 +      DataBlockRegister dbr = new DataBlockRegister();
 +      
 +      WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf);
 +      BlockAppender ba = new BlockAppender(dbr, wbs);
 +      blkInProgress = true;
 +      return ba;
 +    }
 +    
 +    /**
 +     * Callback to make sure a meta block is added to the internal list when its stream is closed.
 +     */
 +    private class MetaBlockRegister implements BlockRegister {
 +      private final String name;
 +      private final Algorithm compressAlgo;
 +      
 +      MetaBlockRegister(String name, Algorithm compressAlgo) {
 +        this.name = name;
 +        this.compressAlgo = compressAlgo;
 +      }
 +      
++      @Override
 +      public void register(long raw, long begin, long end) {
 +        metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw)));
 +      }
 +    }
 +    
 +    /**
 +     * Callback to make sure a data block is added to the internal list when it's being closed.
 +     * 
 +     */
 +    private class DataBlockRegister implements BlockRegister {
 +      DataBlockRegister() {
 +        // do nothing
 +      }
 +      
++      @Override
 +      public void register(long raw, long begin, long end) {
 +        dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * BCFile Reader, interface to read the file's data and meta blocks.
 +   */
 +  static public class Reader implements Closeable {
 +    private static final String META_NAME = "BCFile.metaindex";
 +    private final FSDataInputStream in;
 +    private final Configuration conf;
 +    final DataIndex dataIndex;
 +    // Index for meta blocks
 +    final MetaIndex metaIndex;
 +    final Version version;
 +    
 +    /**
 +     * Intermediate class that maintain the state of a Readable Compression Block.
 +     */
 +    static private final class RBlockState {
 +      private final Algorithm compressAlgo;
 +      private Decompressor decompressor;
 +      private final BlockRegion region;
 +      private final InputStream in;
 +      
 +      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf) throws IOException {
 +        this.compressAlgo = compressionAlgo;
 +        this.region = region;
 +        this.decompressor = compressionAlgo.getDecompressor();
 +        
 +        try {
 +          this.in = compressAlgo.createDecompressionStream(new BoundedRangeFileInputStream(fsin, this.region.getOffset(), this.region.getCompressedSize()),
 +              decompressor, TFile.getFSInputBufferSize(conf));
 +        } catch (IOException e) {
 +          compressAlgo.returnDecompressor(decompressor);
 +          throw e;
 +        }
 +      }
 +      
 +      /**
 +       * Get the output stream for BlockAppender's consumption.
 +       * 
 +       * @return the output stream suitable for writing block data.
 +       */
 +      public InputStream getInputStream() {
 +        return in;
 +      }
 +      
 +      public String getCompressionName() {
 +        return compressAlgo.getName();
 +      }
 +      
 +      public BlockRegion getBlockRegion() {
 +        return region;
 +      }
 +      
 +      public void finish() throws IOException {
 +        try {
 +          in.close();
 +        } finally {
 +          compressAlgo.returnDecompressor(decompressor);
 +          decompressor = null;
 +        }
 +      }
 +    }
 +    
 +    /**
 +     * Access point to read a block.
 +     */
 +    public static class BlockReader extends DataInputStream {
 +      private final RBlockState rBlkState;
 +      private boolean closed = false;
 +      
 +      BlockReader(RBlockState rbs) {
 +        super(rbs.getInputStream());
 +        rBlkState = rbs;
 +      }
 +      
 +      /**
 +       * Finishing reading the block. Release all resources.
 +       */
 +      @Override
 +      public void close() throws IOException {
 +        if (closed == true) {
 +          return;
 +        }
 +        try {
 +          // Do not set rBlkState to null. People may access stats after calling
 +          // close().
 +          rBlkState.finish();
 +        } finally {
 +          closed = true;
 +        }
 +      }
 +      
 +      /**
 +       * Get the name of the compression algorithm used to compress the block.
 +       * 
 +       * @return name of the compression algorithm.
 +       */
 +      public String getCompressionName() {
 +        return rBlkState.getCompressionName();
 +      }
 +      
 +      /**
 +       * Get the uncompressed size of the block.
 +       * 
 +       * @return uncompressed size of the block.
 +       */
 +      public long getRawSize() {
 +        return rBlkState.getBlockRegion().getRawSize();
 +      }
 +      
 +      /**
 +       * Get the compressed size of the block.
 +       * 
 +       * @return compressed size of the block.
 +       */
 +      public long getCompressedSize() {
 +        return rBlkState.getBlockRegion().getCompressedSize();
 +      }
 +      
 +      /**
 +       * Get the starting position of the block in the file.
 +       * 
 +       * @return the starting position of the block in the file.
 +       */
 +      public long getStartPos() {
 +        return rBlkState.getBlockRegion().getOffset();
 +      }
 +    }
 +    
 +    /**
 +     * Constructor
 +     * 
 +     * @param fin
 +     *          FS input stream.
 +     * @param fileLength
 +     *          Length of the corresponding file
-      * @throws IOException
 +     */
 +    public Reader(FSDataInputStream fin, long fileLength, Configuration conf) throws IOException {
 +      this.in = fin;
 +      this.conf = conf;
 +      
 +      // move the cursor to the beginning of the tail, containing: offset to the
 +      // meta block index, version and magic
 +      fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE);
 +      long offsetIndexMeta = fin.readLong();
 +      version = new Version(fin);
 +      Magic.readAndVerify(fin);
 +      
 +      if (!version.compatibleWith(BCFile.API_VERSION)) {
 +        throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
 +      }
 +      
 +      // read meta index
 +      fin.seek(offsetIndexMeta);
 +      metaIndex = new MetaIndex(fin);
 +      
 +      // read data:BCFile.index, the data block index
 +      BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
 +      try {
 +        dataIndex = new DataIndex(blockR);
 +      } finally {
 +        blockR.close();
 +      }
 +    }
 +    
 +    public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf) throws IOException {
 +      this.in = fin;
 +      this.conf = conf;
 +      
 +      BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
 +      BlockRead cachedDataIndex = cache.getCachedMetaBlock(DataIndex.BLOCK_NAME);
 +      
 +      if (cachedMetaIndex == null || cachedDataIndex == null) {
 +        // move the cursor to the beginning of the tail, containing: offset to the
 +        // meta block index, version and magic
 +        fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE);
 +        long offsetIndexMeta = fin.readLong();
 +        version = new Version(fin);
 +        Magic.readAndVerify(fin);
 +        
 +        if (!version.compatibleWith(BCFile.API_VERSION)) {
 +          throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
 +        }
 +        
 +        // read meta index
 +        fin.seek(offsetIndexMeta);
 +        metaIndex = new MetaIndex(fin);
 +        if (cachedMetaIndex == null) {
 +          ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +          DataOutputStream dos = new DataOutputStream(baos);
 +          metaIndex.write(dos);
 +          dos.close();
 +          cache.cacheMetaBlock(META_NAME, baos.toByteArray());
 +        }
 +        
 +        // read data:BCFile.index, the data block index
 +        if (cachedDataIndex == null) {
 +          BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
 +          cachedDataIndex = cache.cacheMetaBlock(DataIndex.BLOCK_NAME, blockR);
 +        }
 +        
 +        dataIndex = new DataIndex(cachedDataIndex);
 +        cachedDataIndex.close();
 +        
 +      } else {
 +        // Logger.getLogger(Reader.class).debug("Read bcfile !METADATA from cache");
 +        version = null;
 +        metaIndex = new MetaIndex(cachedMetaIndex);
 +        dataIndex = new DataIndex(cachedDataIndex);
 +      }
 +    }
 +    
 +    /**
 +     * Get the name of the default compression algorithm.
 +     * 
 +     * @return the name of the default compression algorithm.
 +     */
 +    public String getDefaultCompressionName() {
 +      return dataIndex.getDefaultCompressionAlgorithm().getName();
 +    }
 +    
 +    /**
 +     * Get version of BCFile file being read.
 +     * 
 +     * @return version of BCFile file being read.
 +     */
 +    public Version getBCFileVersion() {
 +      return version;
 +    }
 +    
 +    /**
 +     * Get version of BCFile API.
 +     * 
 +     * @return version of BCFile API.
 +     */
 +    public Version getAPIVersion() {
 +      return API_VERSION;
 +    }
 +    
 +    /**
 +     * Finishing reading the BCFile. Release all resources.
 +     */
++    @Override
 +    public void close() {
 +      // nothing to be done now
 +    }
 +    
 +    /**
 +     * Get the number of data blocks.
 +     * 
 +     * @return the number of data blocks.
 +     */
 +    public int getBlockCount() {
 +      return dataIndex.getBlockRegionList().size();
 +    }
 +    
 +    /**
 +     * Stream access to a Meta Block.
 +     * 
 +     * @param name
 +     *          meta block name
 +     * @return BlockReader input stream for reading the meta block.
-      * @throws IOException
 +     * @throws MetaBlockDoesNotExist
 +     *           The Meta Block with the given name does not exist.
 +     */
 +    public BlockReader getMetaBlock(String name) throws IOException, MetaBlockDoesNotExist {
 +      MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
 +      if (imeBCIndex == null) {
 +        throw new MetaBlockDoesNotExist("name=" + name);
 +      }
 +      
 +      BlockRegion region = imeBCIndex.getRegion();
 +      return createReader(imeBCIndex.getCompressionAlgorithm(), region);
 +    }
 +    
 +    /**
 +     * Stream access to a Data Block.
 +     * 
 +     * @param blockIndex
 +     *          0-based data block index.
 +     * @return BlockReader input stream for reading the data block.
-      * @throws IOException
 +     */
 +    public BlockReader getDataBlock(int blockIndex) throws IOException {
 +      if (blockIndex < 0 || blockIndex >= getBlockCount()) {
 +        throw new IndexOutOfBoundsException(String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
 +      }
 +      
 +      BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
 +      return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
 +    }
 +    
 +    public BlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException {
 +      BlockRegion region = new BlockRegion(offset, compressedSize, rawSize);
 +      return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
 +    }
 +    
 +    private BlockReader createReader(Algorithm compressAlgo, BlockRegion region) throws IOException {
 +      RBlockState rbs = new RBlockState(compressAlgo, in, region, conf);
 +      return new BlockReader(rbs);
 +    }
 +    
 +    /**
 +     * Find the smallest Block index whose starting offset is greater than or equal to the specified offset.
 +     * 
 +     * @param offset
 +     *          User-specific offset.
 +     * @return the index to the data Block if such block exists; or -1 otherwise.
 +     */
 +    public int getBlockIndexNear(long offset) {
 +      ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
 +      int idx = Utils.lowerBound(list, new ScalarLong(offset), new ScalarComparator());
 +      
 +      if (idx == list.size()) {
 +        return -1;
 +      }
 +      
 +      return idx;
 +    }
 +  }
 +  
 +  /**
 +   * Index for all Meta blocks.
 +   */
 +  static class MetaIndex {
 +    // use a tree map, for getting a meta block entry by name
 +    final Map<String,MetaIndexEntry> index;
 +    
 +    // for write
 +    public MetaIndex() {
 +      index = new TreeMap<String,MetaIndexEntry>();
 +    }
 +    
 +    // for read, construct the map from the file
 +    public MetaIndex(DataInput in) throws IOException {
 +      int count = Utils.readVInt(in);
 +      index = new TreeMap<String,MetaIndexEntry>();
 +      
 +      for (int nx = 0; nx < count; nx++) {
 +        MetaIndexEntry indexEntry = new MetaIndexEntry(in);
 +        index.put(indexEntry.getMetaName(), indexEntry);
 +      }
 +    }
 +    
 +    public void addEntry(MetaIndexEntry indexEntry) {
 +      index.put(indexEntry.getMetaName(), indexEntry);
 +    }
 +    
 +    public MetaIndexEntry getMetaByName(String name) {
 +      return index.get(name);
 +    }
 +    
 +    public void write(DataOutput out) throws IOException {
 +      Utils.writeVInt(out, index.size());
 +      
 +      for (MetaIndexEntry indexEntry : index.values()) {
 +        indexEntry.write(out);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * An entry describes a meta block in the MetaIndex.
 +   */
 +  static final class MetaIndexEntry {
 +    private final String metaName;
 +    private final Algorithm compressionAlgorithm;
 +    private final static String defaultPrefix = "data:";
 +    
 +    private final BlockRegion region;
 +    
 +    public MetaIndexEntry(DataInput in) throws IOException {
 +      String fullMetaName = Utils.readString(in);
 +      if (fullMetaName.startsWith(defaultPrefix)) {
 +        metaName = fullMetaName.substring(defaultPrefix.length(), fullMetaName.length());
 +      } else {
 +        throw new IOException("Corrupted Meta region Index");
 +      }
 +      
 +      compressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
 +      region = new BlockRegion(in);
 +    }
 +    
 +    public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm, BlockRegion region) {
 +      this.metaName = metaName;
 +      this.compressionAlgorithm = compressionAlgorithm;
 +      this.region = region;
 +    }
 +    
 +    public String getMetaName() {
 +      return metaName;
 +    }
 +    
 +    public Algorithm getCompressionAlgorithm() {
 +      return compressionAlgorithm;
 +    }
 +    
 +    public BlockRegion getRegion() {
 +      return region;
 +    }
 +    
 +    public void write(DataOutput out) throws IOException {
 +      Utils.writeString(out, defaultPrefix + metaName);
 +      Utils.writeString(out, compressionAlgorithm.getName());
 +      
 +      region.write(out);
 +    }
 +  }
 +  
 +  /**
 +   * Index of all compressed data blocks.
 +   */
 +  static class DataIndex {
 +    final static String BLOCK_NAME = "BCFile.index";
 +    
 +    private final Algorithm defaultCompressionAlgorithm;
 +    
 +    // for data blocks, each entry specifies a block's offset, compressed size
 +    // and raw size
 +    private final ArrayList<BlockRegion> listRegions;
 +    
 +    private boolean trackBlocks;
 +    
 +    // for read, deserialized from a file
 +    public DataIndex(DataInput in) throws IOException {
 +      defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
 +      
 +      int n = Utils.readVInt(in);
 +      listRegions = new ArrayList<BlockRegion>(n);
 +      
 +      for (int i = 0; i < n; i++) {
 +        BlockRegion region = new BlockRegion(in);
 +        listRegions.add(region);
 +      }
 +    }
 +    
 +    // for write
 +    public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) {
 +      this.trackBlocks = trackBlocks;
 +      this.defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
 +      listRegions = new ArrayList<BlockRegion>();
 +    }
 +    
 +    public Algorithm getDefaultCompressionAlgorithm() {
 +      return defaultCompressionAlgorithm;
 +    }
 +    
 +    public ArrayList<BlockRegion> getBlockRegionList() {
 +      return listRegions;
 +    }
 +    
 +    public void addBlockRegion(BlockRegion region) {
 +      if (trackBlocks)
 +        listRegions.add(region);
 +    }
 +    
 +    public void write(DataOutput out) throws IOException {
 +      Utils.writeString(out, defaultCompressionAlgorithm.getName());
 +      
 +      Utils.writeVInt(out, listRegions.size());
 +      
 +      for (BlockRegion region : listRegions) {
 +        region.write(out);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Magic number uniquely identifying a BCFile in the header/footer.
 +   */
 +  static final class Magic {
 +    private final static byte[] AB_MAGIC_BCFILE = {
 +        // ... total of 16 bytes
 +        (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91, (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf, (byte) 0x41,
 +        (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1, (byte) 0x50};
 +    
 +    public static void readAndVerify(DataInput in) throws IOException {
 +      byte[] abMagic = new byte[size()];
 +      in.readFully(abMagic);
 +      
 +      // check against AB_MAGIC_BCFILE, if not matching, throw an
 +      // Exception
 +      if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
 +        throw new IOException("Not a valid BCFile.");
 +      }
 +    }
 +    
 +    public static void write(DataOutput out) throws IOException {
 +      out.write(AB_MAGIC_BCFILE);
 +    }
 +    
 +    public static int size() {
 +      return AB_MAGIC_BCFILE.length;
 +    }
 +  }
 +  
 +  /**
 +   * Block region.
 +   */
 +  static final class BlockRegion implements Scalar {
 +    private final long offset;
 +    private final long compressedSize;
 +    private final long rawSize;
 +    
 +    public BlockRegion(DataInput in) throws IOException {
 +      offset = Utils.readVLong(in);
 +      compressedSize = Utils.readVLong(in);
 +      rawSize = Utils.readVLong(in);
 +    }
 +    
 +    public BlockRegion(long offset, long compressedSize, long rawSize) {
 +      this.offset = offset;
 +      this.compressedSize = compressedSize;
 +      this.rawSize = rawSize;
 +    }
 +    
 +    public void write(DataOutput out) throws IOException {
 +      Utils.writeVLong(out, offset);
 +      Utils.writeVLong(out, compressedSize);
 +      Utils.writeVLong(out, rawSize);
 +    }
 +    
 +    public long getOffset() {
 +      return offset;
 +    }
 +    
 +    public long getCompressedSize() {
 +      return compressedSize;
 +    }
 +    
 +    public long getRawSize() {
 +      return rawSize;
 +    }
 +    
 +    @Override
 +    public long magnitude() {
 +      return offset;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
index 2b57638,0000000..d7734a2
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
@@@ -1,91 -1,0 +1,89 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * 
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +
 +package org.apache.accumulo.core.file.rfile.bcfile;
 +
 +import org.apache.hadoop.io.BytesWritable;
 +
 +/**
 + * Adaptor class to wrap byte-array backed objects (including java byte array) as RawComparable objects.
 + */
 +public final class ByteArray implements RawComparable {
 +  private final byte[] buffer;
 +  private final int offset;
 +  private final int len;
 +  
 +  /**
 +   * Constructing a ByteArray from a {@link BytesWritable}.
-    * 
-    * @param other
 +   */
 +  public ByteArray(BytesWritable other) {
 +    this(other.getBytes(), 0, other.getLength());
 +  }
 +  
 +  /**
 +   * Wrap a whole byte array as a RawComparable.
 +   * 
 +   * @param buffer
 +   *          the byte array buffer.
 +   */
 +  public ByteArray(byte[] buffer) {
 +    this(buffer, 0, buffer.length);
 +  }
 +  
 +  /**
 +   * Wrap a partial byte array as a RawComparable.
 +   * 
 +   * @param buffer
 +   *          the byte array buffer.
 +   * @param offset
 +   *          the starting offset
 +   * @param len
 +   *          the length of the consecutive bytes to be wrapped.
 +   */
 +  public ByteArray(byte[] buffer, int offset, int len) {
 +    if ((offset | len | (buffer.length - offset - len)) < 0) {
 +      throw new IndexOutOfBoundsException();
 +    }
 +    this.buffer = buffer;
 +    this.offset = offset;
 +    this.len = len;
 +  }
 +  
 +  /**
 +   * @return the underlying buffer.
 +   */
 +  @Override
 +  public byte[] buffer() {
 +    return buffer;
 +  }
 +  
 +  /**
 +   * @return the offset in the buffer.
 +   */
 +  @Override
 +  public int offset() {
 +    return offset;
 +  }
 +  
 +  /**
 +   * @return the size of the byte array.
 +   */
 +  @Override
 +  public int size() {
 +    return len;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
index a075d87,0000000..345d406
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
@@@ -1,418 -1,0 +1,416 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * 
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.accumulo.core.file.rfile.bcfile;
 +
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +
 +/**
 + * Several related classes to support chunk-encoded sub-streams on top of a regular stream.
 + */
 +final class Chunk {
 +  
 +  /**
 +   * Prevent the instantiation of class.
 +   */
 +  private Chunk() {
 +    // nothing
 +  }
 +  
 +  /**
 +   * Decoding a chain of chunks encoded through ChunkEncoder or SingleChunkEncoder.
 +   */
 +  static public class ChunkDecoder extends InputStream {
 +    private DataInputStream in = null;
 +    private boolean lastChunk;
 +    private int remain = 0;
 +    private boolean closed;
 +    
 +    public ChunkDecoder() {
 +      lastChunk = true;
 +      closed = true;
 +    }
 +    
 +    public void reset(DataInputStream downStream) {
 +      // no need to wind forward the old input.
 +      in = downStream;
 +      lastChunk = false;
 +      remain = 0;
 +      closed = false;
 +    }
 +    
 +    /**
 +     * Constructor
 +     * 
 +     * @param in
 +     *          The source input stream which contains chunk-encoded data stream.
 +     */
 +    public ChunkDecoder(DataInputStream in) {
 +      this.in = in;
 +      lastChunk = false;
 +      closed = false;
 +    }
 +    
 +    /**
 +     * Have we reached the last chunk.
 +     * 
 +     * @return true if we have reached the last chunk.
-      * @throws java.io.IOException
 +     */
 +    public boolean isLastChunk() throws IOException {
 +      checkEOF();
 +      return lastChunk;
 +    }
 +    
 +    /**
 +     * How many bytes remain in the current chunk?
 +     * 
 +     * @return remaining bytes left in the current chunk.
-      * @throws java.io.IOException
 +     */
 +    public int getRemain() throws IOException {
 +      checkEOF();
 +      return remain;
 +    }
 +    
 +    /**
 +     * Reading the length of next chunk.
 +     * 
 +     * @throws java.io.IOException
 +     *           when no more data is available.
 +     */
 +    private void readLength() throws IOException {
 +      remain = Utils.readVInt(in);
 +      if (remain >= 0) {
 +        lastChunk = true;
 +      } else {
 +        remain = -remain;
 +      }
 +    }
 +    
 +    /**
 +     * Check whether we reach the end of the stream.
 +     * 
 +     * @return false if the chunk encoded stream has more data to read (in which case available() will be greater than 0); true otherwise.
 +     * @throws java.io.IOException
 +     *           on I/O errors.
 +     */
 +    private boolean checkEOF() throws IOException {
 +      if (isClosed())
 +        return true;
 +      while (true) {
 +        if (remain > 0)
 +          return false;
 +        if (lastChunk)
 +          return true;
 +        readLength();
 +      }
 +    }
 +    
 +    @Override
 +    /*
 +     * This method never blocks the caller. Returning 0 does not mean we reach the end of the stream.
 +     */
 +    public int available() {
 +      return remain;
 +    }
 +    
 +    @Override
 +    public int read() throws IOException {
 +      if (checkEOF())
 +        return -1;
 +      int ret = in.read();
 +      if (ret < 0)
 +        throw new IOException("Corrupted chunk encoding stream");
 +      --remain;
 +      return ret;
 +    }
 +    
 +    @Override
 +    public int read(byte[] b) throws IOException {
 +      return read(b, 0, b.length);
 +    }
 +    
 +    @Override
 +    public int read(byte[] b, int off, int len) throws IOException {
 +      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
 +        throw new IndexOutOfBoundsException();
 +      }
 +      
 +      if (!checkEOF()) {
 +        int n = Math.min(remain, len);
 +        int ret = in.read(b, off, n);
 +        if (ret < 0)
 +          throw new IOException("Corrupted chunk encoding stream");
 +        remain -= ret;
 +        return ret;
 +      }
 +      return -1;
 +    }
 +    
 +    @Override
 +    public long skip(long n) throws IOException {
 +      if (!checkEOF()) {
 +        long ret = in.skip(Math.min(remain, n));
 +        remain -= ret;
 +        return ret;
 +      }
 +      return 0;
 +    }
 +    
 +    @Override
 +    public boolean markSupported() {
 +      return false;
 +    }
 +    
 +    public boolean isClosed() {
 +      return closed;
 +    }
 +    
 +    @Override
 +    public void close() throws IOException {
 +      if (closed == false) {
 +        try {
 +          while (!checkEOF()) {
 +            skip(Integer.MAX_VALUE);
 +          }
 +        } finally {
 +          closed = true;
 +        }
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Chunk Encoder. Encoding the output data into a chain of chunks in the following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n, byte[len_n].
 +   * Where len1, len2, ..., len_n are the lengths of the data chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks cannot have length 0.
 +   * All lengths are in the range of 0 to Integer.MAX_VALUE and are encoded in Utils.VInt format.
 +   */
 +  static public class ChunkEncoder extends OutputStream {
 +    /**
 +     * The data output stream it connects to.
 +     */
 +    private DataOutputStream out;
 +    
 +    /**
 +     * The internal buffer that is only used when we do not know the advertised size.
 +     */
 +    private byte buf[];
 +    
 +    /**
 +     * The number of valid bytes in the buffer. This value is always in the range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt> through
 +     * <tt>buf[count-1]</tt> contain valid byte data.
 +     */
 +    private int count;
 +    
 +    /**
 +     * Constructor.
 +     * 
 +     * @param out
 +     *          the underlying output stream.
 +     * @param buf
 +     *          user-supplied buffer. The buffer would be used exclusively by the ChunkEncoder during its life cycle.
 +     */
 +    public ChunkEncoder(DataOutputStream out, byte[] buf) {
 +      this.out = out;
 +      this.buf = buf;
 +      this.count = 0;
 +    }
 +    
 +    /**
 +     * Write out a chunk.
 +     * 
 +     * @param chunk
 +     *          The chunk buffer.
 +     * @param offset
 +     *          Offset to chunk buffer for the beginning of chunk.
 +     * @param len
 +     * @param last
 +     *          Is this the last call to flushBuffer?
 +     */
 +    private void writeChunk(byte[] chunk, int offset, int len, boolean last) throws IOException {
 +      if (last) { // always write out the length for the last chunk.
 +        Utils.writeVInt(out, len);
 +        if (len > 0) {
 +          out.write(chunk, offset, len);
 +        }
 +      } else {
 +        if (len > 0) {
 +          Utils.writeVInt(out, -len);
 +          out.write(chunk, offset, len);
 +        }
 +      }
 +    }
 +    
 +    /**
 +     * Write out a chunk that is a concatenation of the internal buffer plus user supplied data. This will never be the last block.
 +     * 
 +     * @param data
 +     *          User supplied data buffer.
 +     * @param offset
 +     *          Offset to user data buffer.
 +     * @param len
 +     *          User data buffer size.
 +     */
 +    private void writeBufData(byte[] data, int offset, int len) throws IOException {
 +      if (count + len > 0) {
 +        Utils.writeVInt(out, -(count + len));
 +        out.write(buf, 0, count);
 +        count = 0;
 +        out.write(data, offset, len);
 +      }
 +    }
 +    
 +    /**
 +     * Flush the internal buffer.
 +     * 
 +     * Is this the last call to flushBuffer?
 +     * 
 +     * @throws java.io.IOException
 +     */
 +    private void flushBuffer() throws IOException {
 +      if (count > 0) {
 +        writeChunk(buf, 0, count, false);
 +        count = 0;
 +      }
 +    }
 +    
 +    @Override
 +    public void write(int b) throws IOException {
 +      if (count >= buf.length) {
 +        flushBuffer();
 +      }
 +      buf[count++] = (byte) b;
 +    }
 +    
 +    @Override
 +    public void write(byte b[]) throws IOException {
 +      write(b, 0, b.length);
 +    }
 +    
 +    @Override
 +    public void write(byte b[], int off, int len) throws IOException {
 +      if ((len + count) >= buf.length) {
 +        /*
 +         * If the input data do not fit in buffer, flush the output buffer and then write the data directly. In this way buffered streams will cascade
 +         * harmlessly.
 +         */
 +        writeBufData(b, off, len);
 +        return;
 +      }
 +      
 +      System.arraycopy(b, off, buf, count, len);
 +      count += len;
 +    }
 +    
 +    @Override
 +    public void flush() throws IOException {
 +      flushBuffer();
 +      out.flush();
 +    }
 +    
 +    @Override
 +    public void close() throws IOException {
 +      if (buf != null) {
 +        try {
 +          writeChunk(buf, 0, count, true);
 +        } finally {
 +          buf = null;
 +          out = null;
 +        }
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Encode the whole stream as a single chunk. Expecting to know the size of the chunk up-front.
 +   */
 +  static public class SingleChunkEncoder extends OutputStream {
 +    /**
 +     * The data output stream it connects to.
 +     */
 +    private final DataOutputStream out;
 +    
 +    /**
 +     * The remaining bytes to be written.
 +     */
 +    private int remain;
 +    private boolean closed = false;
 +    
 +    /**
 +     * Constructor.
 +     * 
 +     * @param out
 +     *          the underlying output stream.
 +     * @param size
 +     *          The total # of bytes to be written as a single chunk.
 +     * @throws java.io.IOException
 +     *           if an I/O error occurs.
 +     */
 +    public SingleChunkEncoder(DataOutputStream out, int size) throws IOException {
 +      this.out = out;
 +      this.remain = size;
 +      Utils.writeVInt(out, size);
 +    }
 +    
 +    @Override
 +    public void write(int b) throws IOException {
 +      if (remain > 0) {
 +        out.write(b);
 +        --remain;
 +      } else {
 +        throw new IOException("Writing more bytes than advertised size.");
 +      }
 +    }
 +    
 +    @Override
 +    public void write(byte b[]) throws IOException {
 +      write(b, 0, b.length);
 +    }
 +    
 +    @Override
 +    public void write(byte b[], int off, int len) throws IOException {
 +      if (remain >= len) {
 +        out.write(b, off, len);
 +        remain -= len;
 +      } else {
 +        throw new IOException("Writing more bytes than advertised size.");
 +      }
 +    }
 +    
 +    @Override
 +    public void flush() throws IOException {
 +      out.flush();
 +    }
 +    
 +    @Override
 +    public void close() throws IOException {
 +      if (closed == true) {
 +        return;
 +      }
 +      
 +      try {
 +        if (remain > 0) {
 +          throw new IOException("Writing less bytes than advertised size.");
 +        }
 +      } finally {
 +        closed = true;
 +      }
 +    }
 +  }
 +}