You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/12/18 15:53:36 UTC

[06/13] git commit: ACCUMULO-1986 merge to 1.5.1-SNAPSHOT

ACCUMULO-1986 merge to 1.5.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a5e3ed3b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a5e3ed3b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a5e3ed3b

Branch: refs/heads/master
Commit: a5e3ed3bb249d287d67f9079297ee5936ac2c914
Parents: 144d9d5 adee0f1
Author: Eric Newton <er...@gmail.com>
Authored: Wed Dec 18 09:38:17 2013 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Wed Dec 18 09:38:17 2013 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/core/data/Key.java |  13 ++
 .../org/apache/accumulo/core/data/KeyTest.java  |  30 +++-
 .../apache/accumulo/core/data/MutationTest.java | 143 ++++++++++++-------
 .../apache/accumulo/core/data/OldMutation.java  |   7 +
 4 files changed, 136 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a5e3ed3b/core/src/main/java/org/apache/accumulo/core/data/Key.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/data/Key.java
index de9e22d,0000000..4b6867f
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Key.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Key.java
@@@ -1,850 -1,0 +1,863 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.data;
 +
 +/**
 + * This is the Key used to store and access individual values in Accumulo.  A Key is a tuple composed of a row, column family, column qualifier, 
 + * column visibility, timestamp, and delete marker.
 + * 
 + * Keys are comparable and therefore have a sorted order defined by {@link #compareTo(Key)}.
 + * 
 + */
 +
 +import static org.apache.accumulo.core.util.ByteBufferUtil.toBytes;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Arrays;
 +import java.util.List;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.data.thrift.TKey;
 +import org.apache.accumulo.core.data.thrift.TKeyValue;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.WritableComparable;
 +import org.apache.hadoop.io.WritableComparator;
 +import org.apache.hadoop.io.WritableUtils;
 +
 +public class Key implements WritableComparable<Key>, Cloneable {
 +  
 +  protected byte[] row;
 +  protected byte[] colFamily;
 +  protected byte[] colQualifier;
 +  protected byte[] colVisibility;
 +  protected long timestamp;
 +  protected boolean deleted;
 +  
 +  @Override
 +  public boolean equals(Object o) {
 +    if (o instanceof Key)
 +      return this.equals((Key) o, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL);
 +    return false;
 +  }
 +  
 +  private static final byte EMPTY_BYTES[] = new byte[0];
 +  
 +  private byte[] copyIfNeeded(byte ba[], int off, int len, boolean copyData) {
 +    if (len == 0)
 +      return EMPTY_BYTES;
 +    
 +    if (!copyData && ba.length == len && off == 0)
 +      return ba;
 +    
 +    byte[] copy = new byte[len];
 +    System.arraycopy(ba, off, copy, 0, len);
 +    return copy;
 +  }
 +  
 +  private final void init(byte r[], int rOff, int rLen, byte cf[], int cfOff, int cfLen, byte cq[], int cqOff, int cqLen, byte cv[], int cvOff, int cvLen,
 +      long ts, boolean del, boolean copy) {
 +    row = copyIfNeeded(r, rOff, rLen, copy);
 +    colFamily = copyIfNeeded(cf, cfOff, cfLen, copy);
 +    colQualifier = copyIfNeeded(cq, cqOff, cqLen, copy);
 +    colVisibility = copyIfNeeded(cv, cvOff, cvLen, copy);
 +    timestamp = ts;
 +    deleted = del;
 +  }
 +  
 +  /**
 +   * Creates a key with empty row, empty column family, empty column qualifier, empty column visibility, timestamp {@link Long#MAX_VALUE}, and delete marker
 +   * false.
 +   */
 +  public Key() {
 +    row = EMPTY_BYTES;
 +    colFamily = EMPTY_BYTES;
 +    colQualifier = EMPTY_BYTES;
 +    colVisibility = EMPTY_BYTES;
 +    timestamp = Long.MAX_VALUE;
 +    deleted = false;
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, empty column family, empty column qualifier, empty column visibility, timestamp {@link Long#MAX_VALUE}, and delete
 +   * marker false.
 +   */
 +  public Key(Text row) {
 +    init(row.getBytes(), 0, row.getLength(), EMPTY_BYTES, 0, 0, EMPTY_BYTES, 0, 0, EMPTY_BYTES, 0, 0, Long.MAX_VALUE, false, true);
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, empty column family, empty column qualifier, empty column visibility, the specified timestamp, and delete marker
 +   * false.
 +   */
 +  public Key(Text row, long ts) {
 +    this(row);
 +    timestamp = ts;
 +  }
 +  
 +  public Key(byte row[], int rOff, int rLen, byte cf[], int cfOff, int cfLen, byte cq[], int cqOff, int cqLen, byte cv[], int cvOff, int cvLen, long ts) {
 +    init(row, rOff, rLen, cf, cfOff, cfLen, cq, cqOff, cqLen, cv, cvOff, cvLen, ts, false, true);
 +  }
 +  
 +  public Key(byte[] row, byte[] colFamily, byte[] colQualifier, byte[] colVisibility, long timestamp) {
 +    this(row, colFamily, colQualifier, colVisibility, timestamp, false, true);
 +  }
 +  
 +  public Key(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean deleted) {
 +    this(row, cf, cq, cv, ts, deleted, true);
 +  }
 +  
 +  public Key(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean deleted, boolean copy) {
 +    init(row, 0, row.length, cf, 0, cf.length, cq, 0, cq.length, cv, 0, cv.length, ts, deleted, copy);
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, the specified column family, empty column qualifier, empty column visibility, timestamp {@link Long#MAX_VALUE}, and
 +   * delete marker false.
 +   */
 +  public Key(Text row, Text cf) {
 +    init(row.getBytes(), 0, row.getLength(), cf.getBytes(), 0, cf.getLength(), EMPTY_BYTES, 0, 0, EMPTY_BYTES, 0, 0, Long.MAX_VALUE, false, true);
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, the specified column family, the specified column qualifier, empty column visibility, timestamp
 +   * {@link Long#MAX_VALUE}, and delete marker false.
 +   */
 +  public Key(Text row, Text cf, Text cq) {
 +    init(row.getBytes(), 0, row.getLength(), cf.getBytes(), 0, cf.getLength(), cq.getBytes(), 0, cq.getLength(), EMPTY_BYTES, 0, 0, Long.MAX_VALUE, false, true);
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, the specified column family, the specified column qualifier, the specified column visibility, timestamp
 +   * {@link Long#MAX_VALUE}, and delete marker false.
 +   */
 +  public Key(Text row, Text cf, Text cq, Text cv) {
 +    init(row.getBytes(), 0, row.getLength(), cf.getBytes(), 0, cf.getLength(), cq.getBytes(), 0, cq.getLength(), cv.getBytes(), 0, cv.getLength(),
 +        Long.MAX_VALUE, false, true);
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, the specified column family, the specified column qualifier, empty column visibility, the specified timestamp, and
 +   * delete marker false.
 +   */
 +  public Key(Text row, Text cf, Text cq, long ts) {
 +    init(row.getBytes(), 0, row.getLength(), cf.getBytes(), 0, cf.getLength(), cq.getBytes(), 0, cq.getLength(), EMPTY_BYTES, 0, 0, ts, false, true);
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, the specified column family, the specified column qualifier, the specified column visibility, the specified
 +   * timestamp, and delete marker false.
 +   */
 +  public Key(Text row, Text cf, Text cq, Text cv, long ts) {
 +    init(row.getBytes(), 0, row.getLength(), cf.getBytes(), 0, cf.getLength(), cq.getBytes(), 0, cq.getLength(), cv.getBytes(), 0, cv.getLength(), ts, false,
 +        true);
 +  }
 +  
 +  /**
 +   * Creates a key with the specified row, the specified column family, the specified column qualifier, the specified column visibility, the specified
 +   * timestamp, and delete marker false.
 +   */
 +  public Key(Text row, Text cf, Text cq, ColumnVisibility cv, long ts) {
 +    byte[] expr = cv.getExpression();
 +    init(row.getBytes(), 0, row.getLength(), cf.getBytes(), 0, cf.getLength(), cq.getBytes(), 0, cq.getLength(), expr, 0, expr.length, ts, false, true);
 +  }
 +  
 +  /**
 +   * Converts CharSequence to Text and creates a Key using {@link #Key(Text)}.
 +   */
 +  public Key(CharSequence row) {
 +    this(new Text(row.toString()));
 +  }
 +  
 +  /**
 +   * Converts CharSequence to Text and creates a Key using {@link #Key(Text,Text)}.
 +   */
 +  public Key(CharSequence row, CharSequence cf) {
 +    this(new Text(row.toString()), new Text(cf.toString()));
 +  }
 +  
 +  /**
 +   * Converts CharSequence to Text and creates a Key using {@link #Key(Text,Text,Text)}.
 +   */
 +  public Key(CharSequence row, CharSequence cf, CharSequence cq) {
 +    this(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()));
 +  }
 +  
 +  /**
 +   * Converts CharSequence to Text and creates a Key using {@link #Key(Text,Text,Text,Text)}.
 +   */
 +  public Key(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv) {
 +    this(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString()));
 +  }
 +  
 +  /**
 +   * Converts CharSequence to Text and creates a Key using {@link #Key(Text,Text,Text,long)}.
 +   */
 +  public Key(CharSequence row, CharSequence cf, CharSequence cq, long ts) {
 +    this(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), ts);
 +  }
 +  
 +  /**
 +   * Converts CharSequence to Text and creates a Key using {@link #Key(Text,Text,Text,Text,long)}.
 +   */
 +  public Key(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv, long ts) {
 +    this(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString()), ts);
 +  }
 +  
 +  /**
 +   * Converts CharSequence to Text and creates a Key using {@link #Key(Text,Text,Text,ColumnVisibility,long)}.
 +   */
 +  public Key(CharSequence row, CharSequence cf, CharSequence cq, ColumnVisibility cv, long ts) {
 +    this(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.getExpression()), ts);
 +  }
 +  
 +  private byte[] followingArray(byte ba[]) {
 +    byte[] fba = new byte[ba.length + 1];
 +    System.arraycopy(ba, 0, fba, 0, ba.length);
 +    fba[ba.length] = (byte) 0x00;
 +    return fba;
 +  }
 +  
 +  /**
 +   * Returns a key that will sort immediately after this key.
 +   * 
 +   * @param part
 +   *          PartialKey except {@link PartialKey#ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL}
 +   */
 +  public Key followingKey(PartialKey part) {
 +    Key returnKey = new Key();
 +    switch (part) {
 +      case ROW:
 +        returnKey.row = followingArray(row);
 +        break;
 +      case ROW_COLFAM:
 +        returnKey.row = row;
 +        returnKey.colFamily = followingArray(colFamily);
 +        break;
 +      case ROW_COLFAM_COLQUAL:
 +        returnKey.row = row;
 +        returnKey.colFamily = colFamily;
 +        returnKey.colQualifier = followingArray(colQualifier);
 +        break;
 +      case ROW_COLFAM_COLQUAL_COLVIS:
 +        // This isn't useful for inserting into accumulo, but may be useful for lookups.
 +        returnKey.row = row;
 +        returnKey.colFamily = colFamily;
 +        returnKey.colQualifier = colQualifier;
 +        returnKey.colVisibility = followingArray(colVisibility);
 +        break;
 +      case ROW_COLFAM_COLQUAL_COLVIS_TIME:
 +        returnKey.row = row;
 +        returnKey.colFamily = colFamily;
 +        returnKey.colQualifier = colQualifier;
 +        returnKey.colVisibility = colVisibility;
 +        returnKey.setTimestamp(timestamp - 1);
 +        returnKey.deleted = false;
 +        break;
 +      default:
 +        throw new IllegalArgumentException("Partial key specification " + part + " disallowed");
 +    }
 +    return returnKey;
 +  }
 +  
 +  /**
 +   * Creates a key with the same row, column family, column qualifier, column visibility, timestamp, and delete marker as the given key.
 +   */
 +  public Key(Key other) {
 +    set(other);
 +  }
 +  
 +  public Key(TKey tkey) {
 +    this.row = toBytes(tkey.row);
 +    this.colFamily = toBytes(tkey.colFamily);
 +    this.colQualifier = toBytes(tkey.colQualifier);
 +    this.colVisibility = toBytes(tkey.colVisibility);
 +    this.timestamp = tkey.timestamp;
 +    this.deleted = false;
++
++    if (row == null) {
++      throw new IllegalArgumentException("null row");
++    }
++    if (colFamily == null) {
++      throw new IllegalArgumentException("null column family");
++    }
++    if (colQualifier == null) {
++      throw new IllegalArgumentException("null column qualifier");
++    }
++    if (colVisibility == null) {
++      throw new IllegalArgumentException("null column visibility");
++    }
 +  }
 +  
 +  /**
 +   * This method gives users control over allocation of Text objects by copying into the passed in text.
 +   * 
 +   * @param r
 +   *          the key's row will be copied into this Text
 +   * @return the Text that was passed in
 +   */
 +  
 +  public Text getRow(Text r) {
 +    r.set(row, 0, row.length);
 +    return r;
 +  }
 +  
 +  /**
 +   * This method returns a pointer to the keys internal data and does not copy it.
 +   * 
 +   * @return ByteSequence that points to the internal key row data.
 +   */
 +  
 +  public ByteSequence getRowData() {
 +    return new ArrayByteSequence(row);
 +  }
 +  
 +  /**
 +   * This method allocates a Text object and copies into it.
 +   * 
 +   * @return Text containing the row field
 +   */
 +  
 +  public Text getRow() {
 +    return getRow(new Text());
 +  }
 +  
 +  /**
 +   * Efficiently compare the the row of a key w/o allocating a text object and copying the row into it.
 +   * 
 +   * @param r
 +   *          row to compare to keys row
 +   * @return same as {@link #getRow()}.compareTo(r)
 +   */
 +  
 +  public int compareRow(Text r) {
 +    return WritableComparator.compareBytes(row, 0, row.length, r.getBytes(), 0, r.getLength());
 +  }
 +  
 +  /**
 +   * This method returns a pointer to the keys internal data and does not copy it.
 +   * 
 +   * @return ByteSequence that points to the internal key column family data.
 +   */
 +  
 +  public ByteSequence getColumnFamilyData() {
 +    return new ArrayByteSequence(colFamily);
 +  }
 +  
 +  /**
 +   * This method gives users control over allocation of Text objects by copying into the passed in text.
 +   * 
 +   * @param cf
 +   *          the key's column family will be copied into this Text
 +   * @return the Text that was passed in
 +   */
 +  
 +  public Text getColumnFamily(Text cf) {
 +    cf.set(colFamily, 0, colFamily.length);
 +    return cf;
 +  }
 +  
 +  /**
 +   * This method allocates a Text object and copies into it.
 +   * 
 +   * @return Text containing the column family field
 +   */
 +  
 +  public Text getColumnFamily() {
 +    return getColumnFamily(new Text());
 +  }
 +  
 +  /**
 +   * Efficiently compare the the column family of a key w/o allocating a text object and copying the column qualifier into it.
 +   * 
 +   * @param cf
 +   *          column family to compare to keys column family
 +   * @return same as {@link #getColumnFamily()}.compareTo(cf)
 +   */
 +  
 +  public int compareColumnFamily(Text cf) {
 +    return WritableComparator.compareBytes(colFamily, 0, colFamily.length, cf.getBytes(), 0, cf.getLength());
 +  }
 +  
 +  /**
 +   * This method returns a pointer to the keys internal data and does not copy it.
 +   * 
 +   * @return ByteSequence that points to the internal key column qualifier data.
 +   */
 +  
 +  public ByteSequence getColumnQualifierData() {
 +    return new ArrayByteSequence(colQualifier);
 +  }
 +  
 +  /**
 +   * This method gives users control over allocation of Text objects by copying into the passed in text.
 +   * 
 +   * @param cq
 +   *          the key's column qualifier will be copied into this Text
 +   * @return the Text that was passed in
 +   */
 +  
 +  public Text getColumnQualifier(Text cq) {
 +    cq.set(colQualifier, 0, colQualifier.length);
 +    return cq;
 +  }
 +  
 +  /**
 +   * This method allocates a Text object and copies into it.
 +   * 
 +   * @return Text containing the column qualifier field
 +   */
 +  
 +  public Text getColumnQualifier() {
 +    return getColumnQualifier(new Text());
 +  }
 +  
 +  /**
 +   * Efficiently compare the the column qualifier of a key w/o allocating a text object and copying the column qualifier into it.
 +   * 
 +   * @param cq
 +   *          column family to compare to keys column qualifier
 +   * @return same as {@link #getColumnQualifier()}.compareTo(cq)
 +   */
 +  
 +  public int compareColumnQualifier(Text cq) {
 +    return WritableComparator.compareBytes(colQualifier, 0, colQualifier.length, cq.getBytes(), 0, cq.getLength());
 +  }
 +  
 +  public void setTimestamp(long ts) {
 +    this.timestamp = ts;
 +  }
 +  
 +  public long getTimestamp() {
 +    return timestamp;
 +  }
 +  
 +  public boolean isDeleted() {
 +    return deleted;
 +  }
 +  
 +  public void setDeleted(boolean deleted) {
 +    this.deleted = deleted;
 +  }
 +  
 +  /**
 +   * This method returns a pointer to the keys internal data and does not copy it.
 +   * 
 +   * @return ByteSequence that points to the internal key column visibility data.
 +   */
 +  
 +  public ByteSequence getColumnVisibilityData() {
 +    return new ArrayByteSequence(colVisibility);
 +  }
 +  
 +  /**
 +   * This method allocates a Text object and copies into it.
 +   * 
 +   * @return Text containing the column visibility field
 +   */
 +  
 +  public final Text getColumnVisibility() {
 +    return getColumnVisibility(new Text());
 +  }
 +  
 +  /**
 +   * This method gives users control over allocation of Text objects by copying into the passed in text.
 +   * 
 +   * @param cv
 +   *          the key's column visibility will be copied into this Text
 +   * @return the Text that was passed in
 +   */
 +  
 +  public final Text getColumnVisibility(Text cv) {
 +    cv.set(colVisibility, 0, colVisibility.length);
 +    return cv;
 +  }
 +  
 +  /**
 +   * This method creates a new ColumnVisibility representing the column visibility for this key
 +   * 
 +   * WARNING: using this method may inhibit performance since a new ColumnVisibility object is created on every call.
 +   * 
 +   * @return A new object representing the column visibility field
 +   * @since 1.5.0
 +   */
 +  public final ColumnVisibility getColumnVisibilityParsed() {
 +    return new ColumnVisibility(colVisibility);
 +  }
 +  
 +  /**
 +   * Sets this key's row, column family, column qualifier, column visibility, timestamp, and delete marker to be the same as another key's.
 +   */
 +  public void set(Key k) {
 +    row = k.row;
 +    colFamily = k.colFamily;
 +    colQualifier = k.colQualifier;
 +    colVisibility = k.colVisibility;
 +    timestamp = k.timestamp;
 +    deleted = k.deleted;
 +    
 +  }
 +  
 +  public void readFields(DataInput in) throws IOException {
 +    // this method is a little screwy so it will be compatible with older
 +    // code that serialized data
 +    
 +    int colFamilyOffset = WritableUtils.readVInt(in);
 +    int colQualifierOffset = WritableUtils.readVInt(in);
 +    int colVisibilityOffset = WritableUtils.readVInt(in);
 +    int totalLen = WritableUtils.readVInt(in);
 +    
 +    row = new byte[colFamilyOffset];
 +    colFamily = new byte[colQualifierOffset - colFamilyOffset];
 +    colQualifier = new byte[colVisibilityOffset - colQualifierOffset];
 +    colVisibility = new byte[totalLen - colVisibilityOffset];
 +    
 +    in.readFully(row);
 +    in.readFully(colFamily);
 +    in.readFully(colQualifier);
 +    in.readFully(colVisibility);
 +    
 +    timestamp = WritableUtils.readVLong(in);
 +    deleted = in.readBoolean();
 +  }
 +  
 +  public void write(DataOutput out) throws IOException {
 +    
 +    int colFamilyOffset = row.length;
 +    int colQualifierOffset = colFamilyOffset + colFamily.length;
 +    int colVisibilityOffset = colQualifierOffset + colQualifier.length;
 +    int totalLen = colVisibilityOffset + colVisibility.length;
 +    
 +    WritableUtils.writeVInt(out, colFamilyOffset);
 +    WritableUtils.writeVInt(out, colQualifierOffset);
 +    WritableUtils.writeVInt(out, colVisibilityOffset);
 +    
 +    WritableUtils.writeVInt(out, totalLen);
 +    
 +    out.write(row);
 +    out.write(colFamily);
 +    out.write(colQualifier);
 +    out.write(colVisibility);
 +    
 +    WritableUtils.writeVLong(out, timestamp);
 +    out.writeBoolean(deleted);
 +  }
 +  
 +  /**
 +   * Compare part of a key. For example compare just the row and column family, and if those are equal then return true.
 +   * 
 +   */
 +  
 +  public boolean equals(Key other, PartialKey part) {
 +    switch (part) {
 +      case ROW:
 +        return isEqual(row, other.row);
 +      case ROW_COLFAM:
 +        return isEqual(row, other.row) && isEqual(colFamily, other.colFamily);
 +      case ROW_COLFAM_COLQUAL:
 +        return isEqual(row, other.row) && isEqual(colFamily, other.colFamily) && isEqual(colQualifier, other.colQualifier);
 +      case ROW_COLFAM_COLQUAL_COLVIS:
 +        return isEqual(row, other.row) && isEqual(colFamily, other.colFamily) && isEqual(colQualifier, other.colQualifier)
 +            && isEqual(colVisibility, other.colVisibility);
 +      case ROW_COLFAM_COLQUAL_COLVIS_TIME:
 +        return isEqual(row, other.row) && isEqual(colFamily, other.colFamily) && isEqual(colQualifier, other.colQualifier)
 +            && isEqual(colVisibility, other.colVisibility) && timestamp == other.timestamp;
 +      case ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL:
 +        return isEqual(row, other.row) && isEqual(colFamily, other.colFamily) && isEqual(colQualifier, other.colQualifier)
 +            && isEqual(colVisibility, other.colVisibility) && timestamp == other.timestamp && deleted == other.deleted;
 +      default:
 +        throw new IllegalArgumentException("Unrecognized partial key specification " + part);
 +    }
 +  }
 +  
 +  /**
 +   * Compare elements of a key given by a {@link PartialKey}. For example, for {@link PartialKey#ROW_COLFAM}, compare just the row and column family. If the
 +   * rows are not equal, return the result of the row comparison; otherwise, return the result of the column family comparison.
 +   * 
 +   * @see #compareTo(Key)
 +   */
 +  
 +  public int compareTo(Key other, PartialKey part) {
 +    // check for matching row
 +    int result = WritableComparator.compareBytes(row, 0, row.length, other.row, 0, other.row.length);
 +    if (result != 0 || part.equals(PartialKey.ROW))
 +      return result;
 +    
 +    // check for matching column family
 +    result = WritableComparator.compareBytes(colFamily, 0, colFamily.length, other.colFamily, 0, other.colFamily.length);
 +    if (result != 0 || part.equals(PartialKey.ROW_COLFAM))
 +      return result;
 +    
 +    // check for matching column qualifier
 +    result = WritableComparator.compareBytes(colQualifier, 0, colQualifier.length, other.colQualifier, 0, other.colQualifier.length);
 +    if (result != 0 || part.equals(PartialKey.ROW_COLFAM_COLQUAL))
 +      return result;
 +    
 +    // check for matching column visibility
 +    result = WritableComparator.compareBytes(colVisibility, 0, colVisibility.length, other.colVisibility, 0, other.colVisibility.length);
 +    if (result != 0 || part.equals(PartialKey.ROW_COLFAM_COLQUAL_COLVIS))
 +      return result;
 +    
 +    // check for matching timestamp
 +    if (timestamp < other.timestamp)
 +      result = 1;
 +    else if (timestamp > other.timestamp)
 +      result = -1;
 +    else
 +      result = 0;
 +    
 +    if (result != 0 || part.equals(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME))
 +      return result;
 +    
 +    // check for matching deleted flag
 +    if (deleted)
 +      result = other.deleted ? 0 : -1;
 +    else
 +      result = other.deleted ? 1 : 0;
 +    
 +    return result;
 +  }
 +  
 +  /**
 +   * Compare all elements of a key. The elements (row, column family, column qualifier, column visibility, timestamp, and delete marker) are compared in order
 +   * until an unequal element is found. If the row is equal, then compare the column family, etc. The row, column family, column qualifier, and column
 +   * visibility are compared lexographically and sorted ascending. The timestamps are compared numerically and sorted descending so that the most recent data
 +   * comes first. Lastly, a delete marker of true sorts before a delete marker of false.
 +   */
 +  
 +  public int compareTo(Key other) {
 +    return compareTo(other, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL);
 +  }
 +  
 +  @Override
 +  public int hashCode() {
 +    return WritableComparator.hashBytes(row, row.length) + WritableComparator.hashBytes(colFamily, colFamily.length)
 +        + WritableComparator.hashBytes(colQualifier, colQualifier.length) + WritableComparator.hashBytes(colVisibility, colVisibility.length)
 +        + (int) (timestamp ^ (timestamp >>> 32));
 +  }
 +  
 +  public static String toPrintableString(byte ba[], int offset, int len, int maxLen) {
 +    return appendPrintableString(ba, offset, len, maxLen, new StringBuilder()).toString();
 +  }
 +  
 +  public static StringBuilder appendPrintableString(byte ba[], int offset, int len, int maxLen, StringBuilder sb) {
 +    int plen = Math.min(len, maxLen);
 +    
 +    for (int i = 0; i < plen; i++) {
 +      int c = 0xff & ba[offset + i];
 +      if (c >= 32 && c <= 126)
 +        sb.append((char) c);
 +      else
 +        sb.append("%" + String.format("%02x;", c));
 +    }
 +    
 +    if (len > maxLen) {
 +      sb.append("... TRUNCATED");
 +    }
 +    
 +    return sb;
 +  }
 +  
 +  private StringBuilder rowColumnStringBuilder() {
 +    StringBuilder sb = new StringBuilder();
 +    appendPrintableString(row, 0, row.length, Constants.MAX_DATA_TO_PRINT, sb);
 +    sb.append(" ");
 +    appendPrintableString(colFamily, 0, colFamily.length, Constants.MAX_DATA_TO_PRINT, sb);
 +    sb.append(":");
 +    appendPrintableString(colQualifier, 0, colQualifier.length, Constants.MAX_DATA_TO_PRINT, sb);
 +    sb.append(" [");
 +    appendPrintableString(colVisibility, 0, colVisibility.length, Constants.MAX_DATA_TO_PRINT, sb);
 +    sb.append("]");
 +    return sb;
 +  }
 +  
 +  public String toString() {
 +    StringBuilder sb = rowColumnStringBuilder();
 +    sb.append(" ");
 +    sb.append(Long.toString(timestamp));
 +    sb.append(" ");
 +    sb.append(deleted);
 +    return sb.toString();
 +  }
 +  
 +  public String toStringNoTime() {
 +    return rowColumnStringBuilder().toString();
 +  }
 +  
 +  /**
 +   * Returns the sums of the lengths of the row, column family, column qualifier, and visibility.
 +   * 
 +   * @return row.length + colFamily.length + colQualifier.length + colVisibility.length;
 +   */
 +  public int getLength() {
 +    return row.length + colFamily.length + colQualifier.length + colVisibility.length;
 +  }
 +  
 +  /**
 +   * Same as {@link #getLength()}.
 +   */
 +  public int getSize() {
 +    return getLength();
 +  }
 +  
 +  private static boolean isEqual(byte a1[], byte a2[]) {
 +    if (a1 == a2)
 +      return true;
 +    
 +    int last = a1.length;
 +    
 +    if (last != a2.length)
 +      return false;
 +    
 +    if (last == 0)
 +      return true;
 +    
 +    // since sorted data is usually compared in accumulo,
 +    // the prefixes will normally be the same... so compare
 +    // the last two charachters first.. the most likely place
 +    // to have disorder is at end of the strings when the
 +    // data is sorted... if those are the same compare the rest
 +    // of the data forward... comparing backwards is slower
 +    // (compiler and cpu optimized for reading data forward)..
 +    // do not want slower comparisons when data is equal...
 +    // sorting brings equals data together
 +    
 +    last--;
 +    
 +    if (a1[last] == a2[last]) {
 +      for (int i = 0; i < last; i++)
 +        if (a1[i] != a2[i])
 +          return false;
 +    } else {
 +      return false;
 +    }
 +    
 +    return true;
 +    
 +  }
 +  
 +  /**
 +   * Use this to compress a list of keys before sending them via thrift.
 +   * 
 +   * @param param
 +   *          a list of key/value pairs
 +   */
 +  public static List<TKeyValue> compress(List<? extends KeyValue> param) {
 +    
 +    List<TKeyValue> tkvl = Arrays.asList(new TKeyValue[param.size()]);
 +    
 +    if (param.size() > 0)
 +      tkvl.set(0, new TKeyValue(param.get(0).key.toThrift(), ByteBuffer.wrap(param.get(0).value)));
 +    
 +    for (int i = param.size() - 1; i > 0; i--) {
 +      Key prevKey = param.get(i - 1).key;
 +      KeyValue kv = param.get(i);
 +      Key key = kv.key;
 +      
 +      TKey newKey = null;
 +      
 +      if (isEqual(prevKey.row, key.row)) {
 +        newKey = key.toThrift();
 +        newKey.row = null;
 +      }
 +      
 +      if (isEqual(prevKey.colFamily, key.colFamily)) {
 +        if (newKey == null)
 +          newKey = key.toThrift();
 +        newKey.colFamily = null;
 +      }
 +      
 +      if (isEqual(prevKey.colQualifier, key.colQualifier)) {
 +        if (newKey == null)
 +          newKey = key.toThrift();
 +        newKey.colQualifier = null;
 +      }
 +      
 +      if (isEqual(prevKey.colVisibility, key.colVisibility)) {
 +        if (newKey == null)
 +          newKey = key.toThrift();
 +        newKey.colVisibility = null;
 +      }
 +      
 +      if (newKey == null)
 +        newKey = key.toThrift();
 +      
 +      tkvl.set(i, new TKeyValue(newKey, ByteBuffer.wrap(kv.value)));
 +    }
 +    
 +    return tkvl;
 +  }
 +  
 +  /**
 +   * Use this to decompress a list of keys received from thrift.
 +   * 
 +   * @param param
 +   */
 +  
 +  public static void decompress(List<TKeyValue> param) {
 +    for (int i = 1; i < param.size(); i++) {
 +      TKey prevKey = param.get(i - 1).key;
 +      TKey key = param.get(i).key;
 +      
 +      if (key.row == null) {
 +        key.row = prevKey.row;
 +      }
 +      if (key.colFamily == null) {
 +        key.colFamily = prevKey.colFamily;
 +      }
 +      if (key.colQualifier == null) {
 +        key.colQualifier = prevKey.colQualifier;
 +      }
 +      if (key.colVisibility == null) {
 +        key.colVisibility = prevKey.colVisibility;
 +      }
 +    }
 +  }
 +  
 +  byte[] getRowBytes() {
 +    return row;
 +  }
 +  
 +  byte[] getColFamily() {
 +    return colFamily;
 +  }
 +  
 +  byte[] getColQualifier() {
 +    return colQualifier;
 +  }
 +  
 +  byte[] getColVisibility() {
 +    return colVisibility;
 +  }
 +  
 +  public TKey toThrift() {
 +    return new TKey(ByteBuffer.wrap(row), ByteBuffer.wrap(colFamily), ByteBuffer.wrap(colQualifier), ByteBuffer.wrap(colVisibility), timestamp);
 +  }
 +  
 +  @Override
 +  public Object clone() throws CloneNotSupportedException {
 +    Key r = (Key) super.clone();
 +    r.row = Arrays.copyOf(row, row.length);
 +    r.colFamily = Arrays.copyOf(colFamily, colFamily.length);
 +    r.colQualifier = Arrays.copyOf(colQualifier, colQualifier.length);
 +    r.colVisibility = Arrays.copyOf(colVisibility, colVisibility.length);
 +    return r;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a5e3ed3b/core/src/test/java/org/apache/accumulo/core/data/KeyTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/data/KeyTest.java
index a13f2bd,0000000..56442a1
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/data/KeyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/data/KeyTest.java
@@@ -1,110 -1,0 +1,136 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.data;
 +
- import junit.framework.TestCase;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotSame;
++import static org.junit.Assert.assertSame;
++import static org.junit.Assert.assertTrue;
 +
++import org.apache.accumulo.core.data.thrift.TKey;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.hadoop.io.Text;
++import org.junit.Test;
 +
- public class KeyTest extends TestCase {
++public class KeyTest {
++
++  @Test
 +  public void testDeletedCompare() {
 +    Key k1 = new Key("r1".getBytes(), "cf".getBytes(), "cq".getBytes(), new byte[0], 0, false);
 +    Key k2 = new Key("r1".getBytes(), "cf".getBytes(), "cq".getBytes(), new byte[0], 0, false);
 +    Key k3 = new Key("r1".getBytes(), "cf".getBytes(), "cq".getBytes(), new byte[0], 0, true);
 +    Key k4 = new Key("r1".getBytes(), "cf".getBytes(), "cq".getBytes(), new byte[0], 0, true);
 +    
 +    assertTrue(k1.equals(k2));
 +    assertTrue(k3.equals(k4));
 +    assertTrue(k1.compareTo(k3) > 0);
 +    assertTrue(k3.compareTo(k1) < 0);
 +  }
 +  
++  @Test
 +  public void testCopyData() {
 +    byte row[] = "r".getBytes();
 +    byte cf[] = "cf".getBytes();
 +    byte cq[] = "cq".getBytes();
 +    byte cv[] = "cv".getBytes();
 +    
 +    Key k1 = new Key(row, cf, cq, cv, 5l, false, false);
 +    Key k2 = new Key(row, cf, cq, cv, 5l, false, true);
 +    
 +    assertSame(row, k1.getRowBytes());
 +    assertSame(cf, k1.getColFamily());
 +    assertSame(cq, k1.getColQualifier());
 +    assertSame(cv, k1.getColVisibility());
 +    
 +    assertSame(row, k1.getRowData().getBackingArray());
 +    assertSame(cf, k1.getColumnFamilyData().getBackingArray());
 +    assertSame(cq, k1.getColumnQualifierData().getBackingArray());
 +    assertSame(cv, k1.getColumnVisibilityData().getBackingArray());
 +    
 +    assertNotSame(row, k2.getRowBytes());
 +    assertNotSame(cf, k2.getColFamily());
 +    assertNotSame(cq, k2.getColQualifier());
 +    assertNotSame(cv, k2.getColVisibility());
 +    
 +    assertNotSame(row, k2.getRowData().getBackingArray());
 +    assertNotSame(cf, k2.getColumnFamilyData().getBackingArray());
 +    assertNotSame(cq, k2.getColumnQualifierData().getBackingArray());
 +    assertNotSame(cv, k2.getColumnVisibilityData().getBackingArray());
 +    
 +    assertEquals(k1, k2);
 +    
 +  }
 +  
++  @Test
 +  public void testString() {
 +    Key k1 = new Key("r1");
 +    Key k2 = new Key(new Text("r1"));
 +    assertEquals(k2, k1);
 +    
 +    k1 = new Key("r1", "cf1");
 +    k2 = new Key(new Text("r1"), new Text("cf1"));
 +    assertEquals(k2, k1);
 +    
 +    k1 = new Key("r1", "cf2", "cq2");
 +    k2 = new Key(new Text("r1"), new Text("cf2"), new Text("cq2"));
 +    assertEquals(k2, k1);
 +    
 +    k1 = new Key("r1", "cf2", "cq2", "cv");
 +    k2 = new Key(new Text("r1"), new Text("cf2"), new Text("cq2"), new Text("cv"));
 +    assertEquals(k2, k1);
 +    
 +    k1 = new Key("r1", "cf2", "cq2", "cv", 89);
 +    k2 = new Key(new Text("r1"), new Text("cf2"), new Text("cq2"), new Text("cv"), 89);
 +    assertEquals(k2, k1);
 +    
 +    k1 = new Key("r1", "cf2", "cq2", 89);
 +    k2 = new Key(new Text("r1"), new Text("cf2"), new Text("cq2"), 89);
 +    assertEquals(k2, k1);
 +    
 +  }
 +  
++  @Test
 +  public void testVisibilityFollowingKey() {
 +    Key k = new Key("r", "f", "q", "v");
 +    assertEquals(k.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS).toString(), "r f:q [v%00;] " + Long.MAX_VALUE + " false");
 +  }
 +  
 +  public void testVisibilityGetters() {
 +    Key k = new Key("r", "f", "q", "v1|(v2&v3)");
 +    
 +    Text expression = k.getColumnVisibility();
 +    ColumnVisibility parsed = k.getColumnVisibilityParsed();
 +    
 +    assertEquals(expression, new Text(parsed.getExpression()));
 +  }
++
++  @Test
++  public void testThrift() {
++    Key k = new Key("r1", "cf2", "cq2", "cv");
++    TKey tk = k.toThrift();
++    Key k2 = new Key(tk);
++    assertEquals(k, k2);
++  }
++
++  @Test(expected=IllegalArgumentException.class)
++  public void testThrift_Invalid() {
++    Key k = new Key("r1", "cf2", "cq2", "cv");
++    TKey tk = k.toThrift();
++    tk.setRow((byte[]) null);
++    new Key(tk);
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a5e3ed3b/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/data/MutationTest.java
index 09026f7,0000000..33b060e
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java
@@@ -1,579 -1,0 +1,612 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.data;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.List;
 +
- import junit.framework.TestCase;
++import org.junit.Test;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertTrue;
 +
++import org.apache.accumulo.core.data.thrift.TMutation;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.hadoop.io.Text;
 +
- public class MutationTest extends TestCase {
++public class MutationTest  {
 +  
 +  private static String toHexString(byte[] ba) {
 +    StringBuilder str = new StringBuilder();
 +    for (int i = 0; i < ba.length; i++) {
 +      str.append(String.format("%x", ba[i]));
 +    }
 +    return str.toString();
 +  }
 +
 +  /* Test constructing a Mutation using a byte buffer. The byte array
 +   * returned as the row is converted to a hexadecimal string for easy
 +   * comparision.
 +   */
 +  public void testByteConstructor() {
 +    Mutation m = new Mutation("0123456789".getBytes());
 +    assertEquals("30313233343536373839", toHexString(m.getRow()));
 +  }
 +  
 +  public void testLimitedByteConstructor() {
 +    Mutation m = new Mutation("0123456789".getBytes(), 2, 5);
 +    assertEquals("3233343536", toHexString(m.getRow()));
 +  }
 +  
++  @Test
 +  public void test1() {
 +    Mutation m = new Mutation(new Text("r1"));
 +    m.put(new Text("cf1"), new Text("cq1"), new Value("v1".getBytes()));
 +    
 +    List<ColumnUpdate> updates = m.getUpdates();
 +    
 +    assertEquals(1, updates.size());
 +    
 +    ColumnUpdate cu = updates.get(0);
 +    
 +    assertEquals("cf1", new String(cu.getColumnFamily()));
 +    assertEquals("cq1", new String(cu.getColumnQualifier()));
 +    assertEquals("", new String(cu.getColumnVisibility()));
 +    assertFalse(cu.hasTimestamp());
 +    
 +  }
 +  
++  @Test
 +  public void test2() throws IOException {
 +    Mutation m = new Mutation(new Text("r1"));
 +    m.put(new Text("cf1"), new Text("cq1"), new Value("v1".getBytes()));
 +    m.put(new Text("cf2"), new Text("cq2"), 56, new Value("v2".getBytes()));
 +    
 +    List<ColumnUpdate> updates = m.getUpdates();
 +    
 +    assertEquals(2, updates.size());
 +    
 +    assertEquals("r1", new String(m.getRow()));
 +    ColumnUpdate cu = updates.get(0);
 +    
 +    assertEquals("cf1", new String(cu.getColumnFamily()));
 +    assertEquals("cq1", new String(cu.getColumnQualifier()));
 +    assertEquals("", new String(cu.getColumnVisibility()));
 +    assertFalse(cu.hasTimestamp());
 +    
 +    cu = updates.get(1);
 +    
 +    assertEquals("cf2", new String(cu.getColumnFamily()));
 +    assertEquals("cq2", new String(cu.getColumnQualifier()));
 +    assertEquals("", new String(cu.getColumnVisibility()));
 +    assertTrue(cu.hasTimestamp());
 +    assertEquals(56, cu.getTimestamp());
 +    
 +    m = cloneMutation(m);
 +    
 +    assertEquals("r1", new String(m.getRow()));
 +    updates = m.getUpdates();
 +    
 +    assertEquals(2, updates.size());
 +    
 +    cu = updates.get(0);
 +    
 +    assertEquals("cf1", new String(cu.getColumnFamily()));
 +    assertEquals("cq1", new String(cu.getColumnQualifier()));
 +    assertEquals("", new String(cu.getColumnVisibility()));
 +    assertFalse(cu.hasTimestamp());
 +    
 +    cu = updates.get(1);
 +    
 +    assertEquals("cf2", new String(cu.getColumnFamily()));
 +    assertEquals("cq2", new String(cu.getColumnQualifier()));
 +    assertEquals("", new String(cu.getColumnVisibility()));
 +    assertTrue(cu.hasTimestamp());
 +    assertEquals(56, cu.getTimestamp());
 +    
 +  }
 +  
 +  private Mutation cloneMutation(Mutation m) throws IOException {
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    m.write(dos);
 +    dos.close();
 +    
 +    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bais);
 +    
 +    m = new Mutation();
 +    m.readFields(dis);
 +    return m;
 +  }
 +  
++  @Test
 +  public void test3() throws IOException {
 +    Mutation m = new Mutation(new Text("r1"));
 +    for (int i = 0; i < 5; i++) {
 +      int len = Mutation.VALUE_SIZE_COPY_CUTOFF - 2 + i;
 +      byte val[] = new byte[len];
 +      for (int j = 0; j < len; j++)
 +        val[j] = (byte) i;
 +      
 +      m.put(new Text("cf" + i), new Text("cq" + i), new Value(val));
 +      
 +    }
 +    
 +    for (int r = 0; r < 3; r++) {
 +      assertEquals("r1", new String(m.getRow()));
 +      List<ColumnUpdate> updates = m.getUpdates();
 +      assertEquals(5, updates.size());
 +      for (int i = 0; i < 5; i++) {
 +        ColumnUpdate cu = updates.get(i);
 +        assertEquals("cf" + i, new String(cu.getColumnFamily()));
 +        assertEquals("cq" + i, new String(cu.getColumnQualifier()));
 +        assertEquals("", new String(cu.getColumnVisibility()));
 +        assertFalse(cu.hasTimestamp());
 +        
 +        byte[] val = cu.getValue();
 +        int len = Mutation.VALUE_SIZE_COPY_CUTOFF - 2 + i;
 +        assertEquals(len, val.length);
 +        for (int j = 0; j < len; j++)
 +          assertEquals(i, val[j]);
 +      }
 +      
 +      m = cloneMutation(m);
 +    }
 +  }
 +  
 +  private Text nt(String s) {
 +    return new Text(s);
 +  }
 +  
 +  private Value nv(String s) {
 +    return new Value(s.getBytes());
 +  }
 +  
++  @Test
 +  public void testPuts() {
 +    Mutation m = new Mutation(new Text("r1"));
 +    
 +    m.put(nt("cf1"), nt("cq1"), nv("v1"));
 +    m.put(nt("cf2"), nt("cq2"), new ColumnVisibility("cv2"), nv("v2"));
 +    m.put(nt("cf3"), nt("cq3"), 3l, nv("v3"));
 +    m.put(nt("cf4"), nt("cq4"), new ColumnVisibility("cv4"), 4l, nv("v4"));
 +    
 +    m.putDelete(nt("cf5"), nt("cq5"));
 +    m.putDelete(nt("cf6"), nt("cq6"), new ColumnVisibility("cv6"));
 +    m.putDelete(nt("cf7"), nt("cq7"), 7l);
 +    m.putDelete(nt("cf8"), nt("cq8"), new ColumnVisibility("cv8"), 8l);
 +    
 +    assertEquals(8, m.size());
 +    
 +    List<ColumnUpdate> updates = m.getUpdates();
 +    
 +    assertEquals(8, m.size());
 +    assertEquals(8, updates.size());
 +    
-     assertEquals(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-     assertEquals(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
-     assertEquals(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
++    verifyColumnUpdate(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
++    verifyColumnUpdate(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
 +    
-     assertEquals(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
-     assertEquals(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
-     assertEquals(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
-     assertEquals(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
++    verifyColumnUpdate(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
++    verifyColumnUpdate(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
 +    
 +  }
 +  
++  @Test
 +  public void testPutsString() {
 +    Mutation m = new Mutation("r1");
 +    
 +    m.put("cf1", "cq1", nv("v1"));
 +    m.put("cf2", "cq2", new ColumnVisibility("cv2"), nv("v2"));
 +    m.put("cf3", "cq3", 3l, nv("v3"));
 +    m.put("cf4", "cq4", new ColumnVisibility("cv4"), 4l, nv("v4"));
 +    
 +    m.putDelete("cf5", "cq5");
 +    m.putDelete("cf6", "cq6", new ColumnVisibility("cv6"));
 +    m.putDelete("cf7", "cq7", 7l);
 +    m.putDelete("cf8", "cq8", new ColumnVisibility("cv8"), 8l);
 +    
 +    assertEquals(8, m.size());
 +    
 +    List<ColumnUpdate> updates = m.getUpdates();
 +    
 +    assertEquals(8, m.size());
 +    assertEquals(8, updates.size());
 +    
-     assertEquals(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-     assertEquals(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
-     assertEquals(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
++    verifyColumnUpdate(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
++    verifyColumnUpdate(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
 +    
-     assertEquals(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
-     assertEquals(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
-     assertEquals(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
-     assertEquals(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
++    verifyColumnUpdate(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
++    verifyColumnUpdate(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
 +  }
 +  
++  @Test
 +  public void testPutsStringString() {
 +    Mutation m = new Mutation("r1");
 +    
 +    m.put("cf1", "cq1", "v1");
 +    m.put("cf2", "cq2", new ColumnVisibility("cv2"), "v2");
 +    m.put("cf3", "cq3", 3l, "v3");
 +    m.put("cf4", "cq4", new ColumnVisibility("cv4"), 4l, "v4");
 +    
 +    m.putDelete("cf5", "cq5");
 +    m.putDelete("cf6", "cq6", new ColumnVisibility("cv6"));
 +    m.putDelete("cf7", "cq7", 7l);
 +    m.putDelete("cf8", "cq8", new ColumnVisibility("cv8"), 8l);
 +    
 +    assertEquals(8, m.size());
 +    assertEquals("r1", new String(m.getRow()));
 +    
 +    List<ColumnUpdate> updates = m.getUpdates();
 +    
 +    assertEquals(8, m.size());
 +    assertEquals(8, updates.size());
 +    
-     assertEquals(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-     assertEquals(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
-     assertEquals(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
++    verifyColumnUpdate(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
++    verifyColumnUpdate(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
 +    
-     assertEquals(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
-     assertEquals(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
-     assertEquals(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
-     assertEquals(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
++    verifyColumnUpdate(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
++    verifyColumnUpdate(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
 +  }
 +  
 +  public void testByteArrays() {
 +    Mutation m = new Mutation("r1".getBytes());
 +    
 +    m.put("cf1".getBytes(), "cq1".getBytes(), "v1".getBytes());
 +    m.put("cf2".getBytes(), "cq2".getBytes(), new ColumnVisibility("cv2"), "v2".getBytes());
 +    m.put("cf3".getBytes(), "cq3".getBytes(), 3l, "v3".getBytes());
 +    m.put("cf4".getBytes(), "cq4".getBytes(), new ColumnVisibility("cv4"), 4l, "v4".getBytes());
 +    
 +    m.putDelete("cf5".getBytes(), "cq5".getBytes());
 +    m.putDelete("cf6".getBytes(), "cq6".getBytes(), new ColumnVisibility("cv6"));
 +    m.putDelete("cf7".getBytes(), "cq7".getBytes(), 7l);
 +    m.putDelete("cf8".getBytes(), "cq8".getBytes(), new ColumnVisibility("cv8"), 8l);
 +    
 +    assertEquals(8, m.size());
 +    
 +    List<ColumnUpdate> updates = m.getUpdates();
 +    
 +    assertEquals(8, m.size());
 +    assertEquals(8, updates.size());
 +    
-     assertEquals(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-     assertEquals(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
-     assertEquals(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
++    verifyColumnUpdate(updates.get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(updates.get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(updates.get(2), "cf3", "cq3", "", 3l, true, false, "v3");
++    verifyColumnUpdate(updates.get(3), "cf4", "cq4", "cv4", 4l, true, false, "v4");
 +    
-     assertEquals(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
-     assertEquals(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
-     assertEquals(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
-     assertEquals(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
++    verifyColumnUpdate(updates.get(4), "cf5", "cq5", "", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(5), "cf6", "cq6", "cv6", 0l, false, true, "");
++    verifyColumnUpdate(updates.get(6), "cf7", "cq7", "", 7l, true, true, "");
++    verifyColumnUpdate(updates.get(7), "cf8", "cq8", "cv8", 8l, true, true, "");
 +  }
 +
 +  /**
 +   * Test for regression on bug 3422. If a {@link Mutation} object is reused for multiple calls to readFields, the mutation would previously be "locked in" to
 +   * the first set of column updates (and value lengths). Hadoop input formats reuse objects when reading, so if Mutations are used with an input format (or as
 +   * the input to a combiner or reducer) then they will be used in this fashion.
 +   */
++  @Test
 +  public void testMultipleReadFieldsCalls() throws IOException {
 +    // Create test mutations and write them to a byte output stream
 +    Mutation m1 = new Mutation("row1");
 +    m1.put("cf1.1", "cq1.1", new ColumnVisibility("A|B"), "val1.1");
 +    m1.put("cf1.2", "cq1.2", new ColumnVisibility("C|D"), "val1.2");
 +    byte[] val1_3 = new byte[Mutation.VALUE_SIZE_COPY_CUTOFF + 3];
 +    Arrays.fill(val1_3, (byte) 3);
 +    m1.put("cf1.3", "cq1.3", new ColumnVisibility("E|F"), new String(val1_3));
 +    int size1 = m1.size();
 +    long nb1 = m1.numBytes();
 +    
 +    Mutation m2 = new Mutation("row2");
 +    byte[] val2 = new byte[Mutation.VALUE_SIZE_COPY_CUTOFF + 2];
 +    Arrays.fill(val2, (byte) 2);
 +    m2.put("cf2", "cq2", new ColumnVisibility("G|H"), 1234, new String(val2));
 +    int size2 = m2.size();
 +    long nb2 = m2.numBytes();
 +    
 +    ByteArrayOutputStream bos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(bos);
 +    m1.write(dos);
 +    m2.write(dos);
 +    dos.close();
 +    
 +    // Now read the mutations back in from the byte array, making sure to
 +    // reuse the same mutation object, and make sure everything is correct.
 +    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bis);
 +    
 +    Mutation m = new Mutation();
 +    m.readFields(dis);
 +    
 +    assertEquals("row1", new String(m.getRow()));
 +    assertEquals(size1, m.size());
 +    assertEquals(nb1, m.numBytes());
 +    assertEquals(3, m.getUpdates().size());
-     assertEquals(m.getUpdates().get(0), "cf1.1", "cq1.1", "A|B", 0L, false, false, "val1.1");
-     assertEquals(m.getUpdates().get(1), "cf1.2", "cq1.2", "C|D", 0L, false, false, "val1.2");
-     assertEquals(m.getUpdates().get(2), "cf1.3", "cq1.3", "E|F", 0L, false, false, new String(val1_3));
++    verifyColumnUpdate(m.getUpdates().get(0), "cf1.1", "cq1.1", "A|B", 0L, false, false, "val1.1");
++    verifyColumnUpdate(m.getUpdates().get(1), "cf1.2", "cq1.2", "C|D", 0L, false, false, "val1.2");
++    verifyColumnUpdate(m.getUpdates().get(2), "cf1.3", "cq1.3", "E|F", 0L, false, false, new String(val1_3));
 +    
 +    // Reuse the same mutation object (which is what happens in the hadoop framework
 +    // when objects are read by an input format)
 +    m.readFields(dis);
 +    
 +    assertEquals("row2", new String(m.getRow()));
 +    assertEquals(size2, m.size());
 +    assertEquals(nb2, m.numBytes());
 +    assertEquals(1, m.getUpdates().size());
-     assertEquals(m.getUpdates().get(0), "cf2", "cq2", "G|H", 1234L, true, false, new String(val2));
++    verifyColumnUpdate(m.getUpdates().get(0), "cf2", "cq2", "G|H", 1234L, true, false, new String(val2));
 +  }
 +  
-   private void assertEquals(ColumnUpdate cu, String cf, String cq, String cv, long ts, boolean timeSet, boolean deleted, String val) {
++  private void verifyColumnUpdate(ColumnUpdate cu, String cf, String cq, String cv, long ts, boolean timeSet, boolean deleted, String val) {
 +    
 +    assertEquals(cf, new String(cu.getColumnFamily()));
 +    assertEquals(cq, new String(cu.getColumnQualifier()));
 +    assertEquals(cv, new String(cu.getColumnVisibility()));
 +    assertEquals(timeSet, cu.hasTimestamp());
 +    if (timeSet)
 +      assertEquals(ts, cu.getTimestamp());
 +    assertEquals(deleted, cu.isDeleted());
 +    assertEquals(val, new String(cu.getValue()));
 +  }
 +  
++  @Test
 +  public void test4() throws Exception {
 +    Mutation m1 = new Mutation(new Text("r1"));
 +    
 +    m1.put(nt("cf1"), nt("cq1"), nv("v1"));
 +    m1.put(nt("cf2"), nt("cq2"), new ColumnVisibility("cv2"), nv("v2"));
 +    
 +    ByteArrayOutputStream bos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(bos);
 +    m1.write(dos);
 +    dos.close();
 +    
 +    Mutation m2 = new Mutation(new Text("r2"));
 +    
 +    m2.put(nt("cf3"), nt("cq3"), nv("v3"));
 +    m2.put(nt("cf4"), nt("cq4"), new ColumnVisibility("cv2"), nv("v4"));
 +    
 +    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bis);
 +    
 +    // used to be a bug where puts done before readFields would be seen
 +    // after readFields
 +    m2.readFields(dis);
 +    
 +    assertEquals("r1", new String(m2.getRow()));
 +    assertEquals(2, m2.getUpdates().size());
 +    assertEquals(2, m2.size());
-     assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
 +  }
 +  
 +  Mutation convert(OldMutation old) throws IOException {
 +    ByteArrayOutputStream bos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(bos);
 +    old.write(dos);
 +    dos.close();
 +    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bis);
 +    Mutation m = new Mutation();
 +    m.readFields(dis);
 +    dis.close();
 +    return m;
 +  }
 +  
-   
++  @Test
 +  public void testNewSerialization() throws Exception {
 +    // write an old mutation
 +    OldMutation m2 = new OldMutation("r1");
 +    m2.put("cf1", "cq1", "v1");
 +    m2.put("cf2", "cq2", new ColumnVisibility("cv2"), "v2");
 +    m2.putDelete("cf3", "cq3");
 +    ByteArrayOutputStream bos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(bos);
 +    m2.write(dos);
 +    dos.close();
 +    long oldSize = dos.size(); 
 +    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bis);
 +    m2.readFields(dis);
 +    dis.close();
 +    
 +    // check it
 +    assertEquals("r1", new String(m2.getRow()));
 +    assertEquals(3, m2.getUpdates().size());
 +    assertEquals(3, m2.size());
-     assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-     assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
++    verifyColumnUpdate(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
 +
 +    Mutation m1 = convert(m2);
 +    
 +    assertEquals("r1", new String(m1.getRow()));
 +    assertEquals(3, m1.getUpdates().size());
 +    assertEquals(3, m1.size());
-     assertEquals(m1.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(m1.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-     assertEquals(m1.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
++    verifyColumnUpdate(m1.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(m1.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(m1.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
 +    
 +    Text exampleRow = new Text(" 123456789 123456789 123456789 123456789 123456789");
 +    int exampleLen = exampleRow.getLength();
 +    m1 = new Mutation(exampleRow);
 +    m1.put("", "", "");
 +
 +    bos = new ByteArrayOutputStream();
 +    dos = new DataOutputStream(bos);
 +    m1.write(dos);
 +    dos.close();
 +    long newSize = dos.size();
 +    assertTrue(newSize < oldSize);
 +    assertEquals(10, newSize - exampleLen);
 +    assertEquals(68, oldSize - exampleLen);
 +    // I am converting to integer to avoid comparing floats which are inaccurate
 +    assertEquals(14705, (int)(((newSize-exampleLen) * 100. / (oldSize - exampleLen)) * 1000));
 +    StringBuilder sb = new StringBuilder();
 +    byte[] ba = bos.toByteArray();
 +    for (int i = 0; i < bos.size(); i += 4) {
 +      for (int j = i; j < bos.size() && j < i + 4; j++) {
 +        sb.append(String.format("%02x", ba[j]));
 +      }
 +      sb.append(" ");
 +    }
 +    assertEquals("80322031 32333435 36373839 20313233 34353637 38392031 32333435 36373839 20313233 34353637 38392031 32333435 36373839 06000000 00000001 ", sb.toString());
-     
++
 +  }
 +  
++  @Test
 +  public void testReserialize() throws Exception {
 +    // test reading in a new mutation from an old mutation and reserializing the new mutation... this was failing
 +    OldMutation om = new OldMutation("r1");
 +    om.put("cf1", "cq1", "v1");
 +    om.put("cf2", "cq2", new ColumnVisibility("cv2"), "v2");
 +    om.putDelete("cf3", "cq3");
 +    StringBuilder bigVal = new StringBuilder();
 +    for (int i = 0; i < 100000; i++) {
 +      bigVal.append('a');
 +    }
 +    om.put("cf2", "big", bigVal);
 +
 +    
 +    Mutation m1 = convert(om);
 +
 +    ByteArrayOutputStream bos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(bos);
 +    m1.write(dos);
 +    dos.close();
 +    
 +    Mutation m2 = new Mutation();
 +    
 +    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bis);
 +    m2.readFields(dis);
 +    
 +    assertEquals("r1", new String(m1.getRow()));
 +    assertEquals(4, m2.getUpdates().size());
 +    assertEquals(4, m2.size());
-     assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
-     assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-     assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
-     assertEquals(m2.getUpdates().get(3), "cf2", "big", "", 0l, false, false, bigVal.toString());
++    verifyColumnUpdate(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
++    verifyColumnUpdate(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
++    verifyColumnUpdate(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
++    verifyColumnUpdate(m2.getUpdates().get(3), "cf2", "big", "", 0l, false, false, bigVal.toString());
 +  }
- 
++  
++  @Test
 +  public void testEquals() {
 +    Mutation m1 = new Mutation("r1");
 +    
 +    m1.put("cf1", "cq1", "v1");
 +    m1.put("cf1", "cq1", new ColumnVisibility("A&B"), "v2");
 +    m1.put("cf1", "cq1", 3, "v3");
 +    m1.put("cf1", "cq1", new ColumnVisibility("A&B&C"), 4, "v4");
 +    m1.putDelete("cf2", "cf3");
 +    m1.putDelete("cf2", "cf4", 3);
 +    m1.putDelete("cf2", "cf4", new ColumnVisibility("A&B&C"), 3);
 +    
 +    // m2 has same data as m1
 +    Mutation m2 = new Mutation("r1");
 +    
 +    m2.put("cf1", "cq1", "v1");
 +    m2.put("cf1", "cq1", new ColumnVisibility("A&B"), "v2");
 +    m2.put("cf1", "cq1", 3, "v3");
 +    m2.put("cf1", "cq1", new ColumnVisibility("A&B&C"), 4, "v4");
 +    m2.putDelete("cf2", "cf3");
 +    m2.putDelete("cf2", "cf4", 3);
 +    m2.putDelete("cf2", "cf4", new ColumnVisibility("A&B&C"), 3);
 +    
-     // m3 has differnt row than m1
++    // m3 has different row than m1
 +    Mutation m3 = new Mutation("r2");
 +    
 +    m3.put("cf1", "cq1", "v1");
 +    m3.put("cf1", "cq1", new ColumnVisibility("A&B"), "v2");
 +    m3.put("cf1", "cq1", 3, "v3");
 +    m3.put("cf1", "cq1", new ColumnVisibility("A&B&C"), 4, "v4");
 +    m3.putDelete("cf2", "cf3");
 +    m3.putDelete("cf2", "cf4", 3);
 +    m3.putDelete("cf2", "cf4", new ColumnVisibility("A&B&C"), 3);
 +
 +    // m4 has a different column than m1
 +    Mutation m4 = new Mutation("r1");
 +    
 +    m4.put("cf2", "cq1", "v1");
 +    m4.put("cf1", "cq1", new ColumnVisibility("A&B"), "v2");
 +    m4.put("cf1", "cq1", 3, "v3");
 +    m4.put("cf1", "cq1", new ColumnVisibility("A&B&C"), 4, "v4");
 +    m4.putDelete("cf2", "cf3");
 +    m4.putDelete("cf2", "cf4", 3);
 +    m4.putDelete("cf2", "cf4", new ColumnVisibility("A&B&C"), 3);
 +    
 +    // m5 has a different value than m1
 +    Mutation m5 = new Mutation("r1");
 +    
 +    m5.put("cf1", "cq1", "v1");
 +    m5.put("cf1", "cq1", new ColumnVisibility("A&B"), "v2");
 +    m5.put("cf1", "cq1", 3, "v4");
 +    m5.put("cf1", "cq1", new ColumnVisibility("A&B&C"), 4, "v4");
 +    m5.putDelete("cf2", "cf3");
 +    m5.putDelete("cf2", "cf4", 3);
 +    m5.putDelete("cf2", "cf4", new ColumnVisibility("A&B&C"), 3);
 +
 +    assertEquals(m1, m1);
 +    assertEquals(m1, m2);
 +    assertEquals(m2, m1);
 +    assertFalse(m1.equals(m3));
 +    assertFalse(m3.equals(m1));
 +    assertFalse(m1.equals(m4));
 +    assertFalse(m4.equals(m1));
 +    assertFalse(m3.equals(m4));
 +    assertFalse(m1.equals(m5));
 +    assertFalse(m5.equals(m1));
 +    assertFalse(m3.equals(m5));
 +    assertFalse(m4.equals(m5));
 +  }
++
++  @Test
++  public void testThrift() {
++    Mutation m1 = new Mutation("r1");
++    m1.put("cf1", "cq1", "v1");
++    TMutation tm1 = m1.toThrift();
++    Mutation m2 = new Mutation(tm1);
++    assertEquals(m1, m2);
++  }
++  
++  @Test(expected=IllegalArgumentException.class)
++  public void testThrift_Invalid() {
++    Mutation m1 = new Mutation("r1");
++    m1.put("cf1", "cq1", "v1");
++    TMutation tm1 = m1.toThrift();
++    tm1.setRow((byte[]) null);
++    new Mutation(tm1);
++  }
++
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a5e3ed3b/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/data/OldMutation.java
index 786adce,0000000..5f6f993
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java
+++ b/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java
@@@ -1,492 -1,0 +1,499 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.data;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import org.apache.accumulo.core.data.thrift.TMutation;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +
 +/**
 + * Will read/write old mutations.
 + */
 +public class OldMutation implements Writable {
 +  
 +  static final int VALUE_SIZE_COPY_CUTOFF = 1 << 15;
 +  
 +  private byte[] row;
 +  private byte[] data;
 +  private int entries;
 +  private List<byte[]> values;
 +  
 +  // created this little class instead of using ByteArrayOutput stream and DataOutputStream
 +  // because both are synchronized... lots of small syncs slow things down
 +  private static class ByteBuffer {
 +    
 +    int offset;
 +    byte data[] = new byte[64];
 +    
 +    private void reserve(int l) {
 +      if (offset + l > data.length) {
 +        int newSize = data.length * 2;
 +        while (newSize <= offset + l)
 +          newSize = newSize * 2;
 +        
 +        byte[] newData = new byte[newSize];
 +        System.arraycopy(data, 0, newData, 0, offset);
 +        data = newData;
 +      }
 +      
 +    }
 +    
 +    void add(byte[] b) {
 +      reserve(b.length);
 +      System.arraycopy(b, 0, data, offset, b.length);
 +      offset += b.length;
 +    }
 +    
 +    public void add(byte[] bytes, int off, int length) {
 +      reserve(length);
 +      System.arraycopy(bytes, off, data, offset, length);
 +      offset += length;
 +    }
 +    
 +    void add(boolean b) {
 +      reserve(1);
 +      if (b)
 +        data[offset++] = 1;
 +      else
 +        data[offset++] = 0;
 +    }
 +    
 +    void add(long v) {
 +      reserve(8);
 +      data[offset++] = (byte) (v >>> 56);
 +      data[offset++] = (byte) (v >>> 48);
 +      data[offset++] = (byte) (v >>> 40);
 +      data[offset++] = (byte) (v >>> 32);
 +      data[offset++] = (byte) (v >>> 24);
 +      data[offset++] = (byte) (v >>> 16);
 +      data[offset++] = (byte) (v >>> 8);
 +      data[offset++] = (byte) (v >>> 0);
 +    }
 +    
 +    void add(int i) {
 +      reserve(4);
 +      data[offset++] = (byte) (i >>> 24);
 +      data[offset++] = (byte) (i >>> 16);
 +      data[offset++] = (byte) (i >>> 8);
 +      data[offset++] = (byte) (i >>> 0);
 +    }
 +    
 +    public byte[] toArray() {
 +      byte ret[] = new byte[offset];
 +      System.arraycopy(data, 0, ret, 0, offset);
 +      return ret;
 +    }
 +    
 +  }
 +  
 +  private static class SimpleReader {
 +    int offset;
 +    byte data[];
 +    
 +    SimpleReader(byte b[]) {
 +      this.data = b;
 +    }
 +    
 +    int readInt() {
 +      return (data[offset++] << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0);
 +      
 +    }
 +    
 +    long readLong() {
 +      return (((long) data[offset++] << 56) + ((long) (data[offset++] & 255) << 48) + ((long) (data[offset++] & 255) << 40)
 +          + ((long) (data[offset++] & 255) << 32) + ((long) (data[offset++] & 255) << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0));
 +    }
 +    
 +    void readBytes(byte b[]) {
 +      System.arraycopy(data, offset, b, 0, b.length);
 +      offset += b.length;
 +    }
 +    
 +    boolean readBoolean() {
 +      return (data[offset++] == 1);
 +    }
 +    
 +  }
 +  
 +  private ByteBuffer buffer;
 +  
 +  private List<ColumnUpdate> updates;
 +  
 +  private static final byte[] EMPTY_BYTES = new byte[0];
 +  
 +  private void serialize() {
 +    if (buffer != null) {
 +      data = buffer.toArray();
 +      buffer = null;
 +    }
 +  }
 +  
 +  public OldMutation(Text row) {
 +    this.row = new byte[row.getLength()];
 +    System.arraycopy(row.getBytes(), 0, this.row, 0, row.getLength());
 +    buffer = new ByteBuffer();
 +  }
 +  
 +  public OldMutation(CharSequence row) {
 +    this(new Text(row.toString()));
 +  }
 +  
 +  public OldMutation() {}
 +  
 +  public OldMutation(TMutation tmutation) {
 +    this.row = ByteBufferUtil.toBytes(tmutation.row);
 +    this.data = ByteBufferUtil.toBytes(tmutation.data);
 +    this.entries = tmutation.entries;
 +    this.values = ByteBufferUtil.toBytesList(tmutation.values);
++
++    if (this.row == null) {
++      throw new IllegalArgumentException("null row");
++    }
++    if (this.data == null) {
++      throw new IllegalArgumentException("null serialized data");
++    }
 +  }
 +  
 +  public byte[] getRow() {
 +    return row;
 +  }
 +  
 +  private void put(byte b[]) {
 +    buffer.add(b.length);
 +    buffer.add(b);
 +  }
 +  
 +  private void put(Text t) {
 +    buffer.add(t.getLength());
 +    buffer.add(t.getBytes(), 0, t.getLength());
 +  }
 +  
 +  private void put(boolean b) {
 +    buffer.add(b);
 +  }
 +  
 +  private void put(int i) {
 +    buffer.add(i);
 +  }
 +  
 +  private void put(long l) {
 +    buffer.add(l);
 +  }
 +  
 +  private void put(Text cf, Text cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val) {
 +    
 +    if (buffer == null)
 +      throw new IllegalStateException("Can not add to mutation after serializing it");
 +    
 +    put(cf);
 +    put(cq);
 +    put(cv);
 +    put(hasts);
 +    put(ts);
 +    put(deleted);
 +    
 +    if (val.length < VALUE_SIZE_COPY_CUTOFF) {
 +      put(val);
 +    } else {
 +      if (values == null)
 +        values = new ArrayList<byte[]>();
 +      byte copy[] = new byte[val.length];
 +      System.arraycopy(val, 0, copy, 0, val.length);
 +      values.add(copy);
 +      put(-1 * values.size());
 +    }
 +    
 +    entries++;
 +  }
 +  
 +  private void put(CharSequence cf, CharSequence cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val) {
 +    put(new Text(cf.toString()), new Text(cq.toString()), cv, hasts, ts, deleted, val);
 +  }
 +  
 +  private void put(CharSequence cf, CharSequence cq, byte[] cv, boolean hasts, long ts, boolean deleted, CharSequence val) {
 +    put(cf, cq, cv, hasts, ts, deleted, TextUtil.getBytes(new Text(val.toString())));
 +  }
 +  
 +  public void put(Text columnFamily, Text columnQualifier, Value value) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, false, value.get());
 +  }
 +  
 +  public void put(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility, Value value) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, false, value.get());
 +  }
 +  
 +  public void put(Text columnFamily, Text columnQualifier, long timestamp, Value value) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, false, value.get());
 +  }
 +  
 +  public void put(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility, long timestamp, Value value) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, false, value.get());
 +  }
 +  
 +  public void putDelete(Text columnFamily, Text columnQualifier) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, true, EMPTY_BYTES);
 +  }
 +  
 +  public void putDelete(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, true, EMPTY_BYTES);
 +  }
 +  
 +  public void putDelete(Text columnFamily, Text columnQualifier, long timestamp) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, true, EMPTY_BYTES);
 +  }
 +  
 +  public void putDelete(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility, long timestamp) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, true, EMPTY_BYTES);
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, Value value) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, false, value.get());
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, Value value) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, false, value.get());
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, long timestamp, Value value) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, false, value.get());
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, long timestamp, Value value) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, false, value.get());
 +  }
 +  
 +  public void putDelete(CharSequence columnFamily, CharSequence columnQualifier) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, true, EMPTY_BYTES);
 +  }
 +  
 +  public void putDelete(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, true, EMPTY_BYTES);
 +  }
 +  
 +  public void putDelete(CharSequence columnFamily, CharSequence columnQualifier, long timestamp) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, true, EMPTY_BYTES);
 +  }
 +  
 +  public void putDelete(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, long timestamp) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, true, EMPTY_BYTES);
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, CharSequence value) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, false, value);
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, CharSequence value) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, false, value);
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, long timestamp, CharSequence value) {
 +    put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, false, value);
 +  }
 +  
 +  public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, long timestamp, CharSequence value) {
 +    put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, false, value);
 +  }
 +  
 +  private byte[] readBytes(SimpleReader in) {
 +    int len = in.readInt();
 +    if (len == 0)
 +      return EMPTY_BYTES;
 +    
 +    byte bytes[] = new byte[len];
 +    in.readBytes(bytes);
 +    return bytes;
 +  }
 +  
 +  public List<ColumnUpdate> getUpdates() {
 +    serialize();
 +    
 +    SimpleReader in = new SimpleReader(data);
 +    
 +    if (updates == null) {
 +      if (entries == 1) {
 +        updates = Collections.singletonList(deserializeColumnUpdate(in));
 +      } else {
 +        ColumnUpdate[] tmpUpdates = new ColumnUpdate[entries];
 +        
 +        for (int i = 0; i < entries; i++)
 +          tmpUpdates[i] = deserializeColumnUpdate(in);
 +        
 +        updates = Arrays.asList(tmpUpdates);
 +      }
 +    }
 +    
 +    return updates;
 +  }
 +  
 +  private ColumnUpdate deserializeColumnUpdate(SimpleReader in) {
 +    byte[] cf = readBytes(in);
 +    byte[] cq = readBytes(in);
 +    byte[] cv = readBytes(in);
 +    boolean hasts = in.readBoolean();
 +    long ts = in.readLong();
 +    boolean deleted = in.readBoolean();
 +    
 +    byte[] val;
 +    int valLen = in.readInt();
 +    
 +    if (valLen < 0) {
 +      val = values.get((-1 * valLen) - 1);
 +    } else if (valLen == 0) {
 +      val = EMPTY_BYTES;
 +    } else {
 +      val = new byte[valLen];
 +      in.readBytes(val);
 +    }
 +    
 +    return new ColumnUpdate(cf, cq, cv, hasts, ts, deleted, val);
 +  }
 +  
 +  private int cachedValLens = -1;
 +  
 +  long getValueLengths() {
 +    if (values == null)
 +      return 0;
 +    
 +    if (cachedValLens == -1) {
 +      int tmpCVL = 0;
 +      for (byte[] val : values)
 +        tmpCVL += val.length;
 +      
 +      cachedValLens = tmpCVL;
 +    }
 +    
 +    return cachedValLens;
 +    
 +  }
 +  
 +  public long numBytes() {
 +    serialize();
 +    return row.length + data.length + getValueLengths();
 +  }
 +  
 +  public long estimatedMemoryUsed() {
 +    return numBytes() + 230;
 +  }
 +  
 +  /**
 +   * @return the number of column value pairs added to the mutation
 +   */
 +  public int size() {
 +    return entries;
 +  }
 +  
 +  @Override
 +  public void readFields(DataInput in) throws IOException {
 +    // Clear out cached column updates and value lengths so
 +    // that we recalculate them based on the (potentially) new
 +    // data we are about to read in.
 +    updates = null;
 +    cachedValLens = -1;
 +    buffer = null;
 +    
 +    int len = in.readInt();
 +    row = new byte[len];
 +    in.readFully(row);
 +    len = in.readInt();
 +    data = new byte[len];
 +    in.readFully(data);
 +    entries = in.readInt();
 +    
 +    boolean valuesPresent = in.readBoolean();
 +    if (!valuesPresent) {
 +      values = null;
 +    } else {
 +      values = new ArrayList<byte[]>();
 +      int numValues = in.readInt();
 +      for (int i = 0; i < numValues; i++) {
 +        len = in.readInt();
 +        byte val[] = new byte[len];
 +        in.readFully(val);
 +        values.add(val);
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public void write(DataOutput out) throws IOException {
 +    serialize();
 +    out.writeInt(row.length);
 +    out.write(row);
 +    out.writeInt(data.length);
 +    out.write(data);
 +    out.writeInt(entries);
 +    
 +    if (values == null)
 +      out.writeBoolean(false);
 +    else {
 +      out.writeBoolean(true);
 +      out.writeInt(values.size());
 +      for (int i = 0; i < values.size(); i++) {
 +        byte val[] = values.get(i);
 +        out.writeInt(val.length);
 +        out.write(val);
 +      }
 +    }
 +    
 +  }
 +  
 +  @Override
 +  public boolean equals(Object o) {
 +    if (o instanceof OldMutation)
 +      return equals((OldMutation) o);
 +    return false;
 +  }
 +  
 +  @Override
 +  public int hashCode() {
 +    return toThrift().hashCode();
 +  }
 +  
 +  public boolean equals(OldMutation m) {
 +    serialize();
 +    if (!Arrays.equals(row, m.getRow()))
 +      return false;
 +    List<ColumnUpdate> oldcus = this.getUpdates();
 +    List<ColumnUpdate> newcus = m.getUpdates();
 +    if (oldcus.size() != newcus.size())
 +      return false;
 +    for (int i = 0; i < newcus.size(); i++) {
 +      ColumnUpdate oldcu = oldcus.get(i);
 +      ColumnUpdate newcu = newcus.get(i);
 +      if (!oldcu.equals(newcu))
 +        return false;
 +    }
 +    return false;
 +  }
 +  
 +  public TMutation toThrift() {
 +    serialize();
 +    return new TMutation(java.nio.ByteBuffer.wrap(row), java.nio.ByteBuffer.wrap(data), ByteBufferUtil.toByteBuffers(values), entries);
 +  }
 +  
 +}