You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/03/22 00:28:24 UTC

[hbase] branch branch-1 updated: HBASE-22034 Backport HBASE-21401 and HBASE-22032 to branch-1

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 953e0f5  HBASE-22034 Backport HBASE-21401 and HBASE-22032 to branch-1
953e0f5 is described below

commit 953e0f5f773f194a774ab4224999dc20e5ffcf19
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Fri Mar 15 14:04:29 2019 -0700

    HBASE-22034 Backport HBASE-21401 and HBASE-22032 to branch-1
    
    HBASE-21401 Sanity check when constructing the KeyValue
    
    HBASE-22032 KeyValue validation should check for null byte array
---
 .../java/org/apache/hadoop/hbase/CellUtil.java     |  20 +
 .../java/org/apache/hadoop/hbase/KeyValue.java     |  16 +-
 .../java/org/apache/hadoop/hbase/KeyValueUtil.java | 192 +++++++-
 .../apache/hadoop/hbase/codec/KeyValueCodec.java   |   4 +-
 .../hadoop/hbase/codec/KeyValueCodecWithTags.java  |   4 +-
 .../java/org/apache/hadoop/hbase/TestKeyValue.java | 541 ++++++++-------------
 .../hbase/io/encoding/TestDataBlockEncoders.java   |  30 +-
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java    |   4 +-
 .../hadoop/hbase/io/hfile/TestHFileSeek.java       |   8 +-
 9 files changed, 427 insertions(+), 392 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 981fad8..2b43b00 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -673,6 +673,26 @@ public final class CellUtil {
     };
   }
 
+  public static byte[] cloneTags(Cell cell) {
+    byte[] output = new byte[cell.getTagsLength()];
+    copyTagsTo(cell, output, 0);
+    return output;
+  }
+
+  /**
+   * Copies the tags info into the tag portion of the cell
+   * @param cell the cell
+   * @param destination byte array that will receive tag data
+   * @param destinationOffset start offset in byte array that will receive tag data
+   * @return position after tags
+   */
+  public static int copyTagsTo(Cell cell, byte[] destination, int destinationOffset) {
+    int tlen = cell.getTagsLength();
+    System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), destination,
+      destinationOffset, tlen);
+    return destinationOffset + tlen;
+  }
+
   /**
    * Returns true if the first range start1...end1 overlaps with the second range
    * start2...end2, assuming the byte arrays represent row keys
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 2b09faf..1f84d62 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -252,6 +252,15 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     }
 
     /**
+     * True to indicate that the byte b is a valid type.
+     * @param b byte to check
+     * @return true or false
+     */
+    static boolean isValidType(byte b) {
+      return codeArray[b & 0xff] != null;
+    }
+
+    /**
      * Cannot rely on enum ordinals . They change if item is removed or moved.
      * Do our own codes.
      * @param b
@@ -346,7 +355,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * @param offset offset to start of the KeyValue
    * @param length length of the KeyValue
    */
-  public KeyValue(final byte [] bytes, final int offset, final int length) {
+  public KeyValue(final byte[] bytes, final int offset, final int length) {
+    KeyValueUtil.checkKeyValueBytes(bytes, offset, length, true);
     this.bytes = bytes;
     this.offset = offset;
     this.length = length;
@@ -2510,11 +2520,11 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * @param in
    * @return Created KeyValue or throws an exception
    * @throws IOException
-   * @deprecated Use {@link KeyValueUtil#iscreate(InputStream, boolean)}
+   * @deprecated Use {@link KeyValueUtil#createKeyValueFromInputStream(InputStream, boolean)}
    */
   @Deprecated
   public static KeyValue iscreate(final InputStream in) throws IOException {
-    return KeyValueUtil.iscreate(in, true);
+    return KeyValueUtil.createKeyValueFromInputStream(in, true);
   }
 
   /**
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
index 6d502fd..c9fcd51 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
@@ -581,19 +581,150 @@ public class KeyValueUtil {
     return new ArrayList<KeyValue>(lazyList);
   }
 
+  static String bytesToHex(byte[] buf, int offset, int length) {
+    String bufferContents = buf != null ? Bytes.toStringBinary(buf, offset, length) : "<null>";
+    return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset
+        + ", length=" + length;
+  }
+
+  static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
+    if (buf == null) {
+      throw new IllegalArgumentException("Invalid to have null " +
+          "byte array in KeyValue.");
+    }
+
+    int pos = offset, endOffset = offset + length;
+    // check the key
+    if (pos + Bytes.SIZEOF_INT > endOffset) {
+      throw new IllegalArgumentException(
+          "Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length));
+    }
+    int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
+    pos += Bytes.SIZEOF_INT;
+    if (keyLen <= 0 || pos + keyLen > endOffset) {
+      throw new IllegalArgumentException(
+          "Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length));
+    }
+    // check the value
+    if (pos + Bytes.SIZEOF_INT > endOffset) {
+      throw new IllegalArgumentException("Overflow when reading value length at position=" + pos
+          + bytesToHex(buf, offset, length));
+    }
+    int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
+    pos += Bytes.SIZEOF_INT;
+    if (valLen < 0 || pos + valLen > endOffset) {
+      throw new IllegalArgumentException("Invalid value length in KeyValue, valueLength=" + valLen
+          + bytesToHex(buf, offset, length));
+    }
+    // check the row
+    if (pos + Bytes.SIZEOF_SHORT > endOffset) {
+      throw new IllegalArgumentException(
+          "Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length));
+    }
+    short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
+    pos += Bytes.SIZEOF_SHORT;
+    if (rowLen < 0 || pos + rowLen > endOffset) {
+      throw new IllegalArgumentException(
+          "Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length));
+    }
+    pos += rowLen;
+    // check the family
+    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
+      throw new IllegalArgumentException("Overflow when reading family length at position=" + pos
+          + bytesToHex(buf, offset, length));
+    }
+    int familyLen = buf[pos];
+    pos += Bytes.SIZEOF_BYTE;
+    if (familyLen < 0 || pos + familyLen > endOffset) {
+      throw new IllegalArgumentException("Invalid family length in KeyValue, familyLength="
+          + familyLen + bytesToHex(buf, offset, length));
+    }
+    pos += familyLen;
+    // check the qualifier
+    int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
+        - Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
+    if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
+      throw new IllegalArgumentException("Invalid qualifier length in KeyValue, qualifierLen="
+          + qualifierLen + bytesToHex(buf, offset, length));
+    }
+    pos += qualifierLen;
+    // check the timestamp
+    if (pos + Bytes.SIZEOF_LONG > endOffset) {
+      throw new IllegalArgumentException(
+          "Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length));
+    }
+    long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
+    if (timestamp < 0) {
+      throw new IllegalArgumentException(
+          "Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length));
+    }
+    pos += Bytes.SIZEOF_LONG;
+    // check the type
+    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
+      throw new IllegalArgumentException(
+          "Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length));
+    }
+    byte type = buf[pos];
+    if (!Type.isValidType(type)) {
+      throw new IllegalArgumentException(
+          "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length));
+    }
+    pos += Bytes.SIZEOF_BYTE;
+    // check the value
+    if (pos + valLen > endOffset) {
+      throw new IllegalArgumentException(
+          "Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length));
+    }
+    pos += valLen;
+    // check the tags
+    if (withTags) {
+      if (pos == endOffset) {
+        // withTags is true but no tag in the cell.
+        return;
+      }
+      if (pos + Bytes.SIZEOF_SHORT > endOffset) {
+        throw new IllegalArgumentException("Overflow when reading tags length at position=" + pos
+            + bytesToHex(buf, offset, length));
+      }
+      short tagsLen = Bytes.toShort(buf, pos);
+      pos += Bytes.SIZEOF_SHORT;
+      if (tagsLen < 0 || pos + tagsLen > endOffset) {
+        throw new IllegalArgumentException("Invalid tags length in KeyValue at position="
+            + (pos - Bytes.SIZEOF_SHORT) + bytesToHex(buf, offset, length));
+      }
+      int tagsEndOffset = pos + tagsLen;
+      for (; pos < tagsEndOffset;) {
+        if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
+          throw new IllegalArgumentException("Overflow when reading tag length at position=" + pos
+              + bytesToHex(buf, offset, length));
+        }
+        short tagLen = Bytes.toShort(buf, pos);
+        pos += Tag.TAG_LENGTH_SIZE;
+        // tagLen contains one byte tag type, so must be not less than 1.
+        if (tagLen < 1 || pos + tagLen > endOffset) {
+          throw new IllegalArgumentException(
+              "Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
+                  + tagLen + bytesToHex(buf, offset, length));
+        }
+        pos += tagLen;
+      }
+    }
+    if (pos != endOffset) {
+      throw new IllegalArgumentException("Some redundant bytes in KeyValue's buffer, startOffset="
+          + pos + ", endOffset=" + endOffset + bytesToHex(buf, offset, length));
+    }
+  }
+
   /**
-   * Create a KeyValue reading from the raw InputStream. Named
-   * <code>iscreate</code> so doesn't clash with the <code>create(DataInput)</code> method
-   * added in 2.0
-   *
-   * @param in
-   * @param withTags
-   *          whether the keyvalue should include tags are not
-   * @return Created KeyValue OR if we find a length of zero, we will return
-   *         null which can be useful marking a stream as done.
+   * Create a KeyValue reading from the raw InputStream.
+   * @param in inputStream to read.
+   * @param withTags whether the keyvalue should include tags are not
+   * @return Created KeyValue OR if we find a length of zero, we will return null which can be
+   *         useful marking a stream as done.
    * @throws IOException
    */
-  public static KeyValue iscreate(final InputStream in, boolean withTags) throws IOException {
+  public static KeyValue createKeyValueFromInputStream(InputStream in, boolean withTags)
+      throws IOException {
     byte[] intBytes = new byte[Bytes.SIZEOF_INT];
     int bytesRead = 0;
     while (bytesRead < intBytes.length) {
@@ -606,14 +737,15 @@ public class KeyValueUtil {
       }
       bytesRead += n;
     }
-    // TODO: perhaps some sanity check is needed here.
     byte[] bytes = new byte[Bytes.toInt(intBytes)];
     IOUtils.readFully(in, bytes, 0, bytes.length);
-    if (withTags) {
-      return new KeyValue(bytes, 0, bytes.length);
-    } else {
-      return new NoTagsKeyValue(bytes, 0, bytes.length);
-    }
+    return withTags ? new KeyValue(bytes, 0, bytes.length)
+        : new NoTagsKeyValue(bytes, 0, bytes.length);
+  }
+
+  public static int getSerializedSize(Cell cell, boolean withTags) {
+    return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
+      cell.getValueLength(), cell.getTagsLength(), withTags);
   }
 
   public static void oswrite(final Cell cell, final OutputStream out, final boolean withTags)
@@ -658,4 +790,32 @@ public class KeyValueUtil {
       }
     }
   }
+
+  /**
+   * @return A KeyValue made of a byte array that holds the key-only part.
+   *         Needed to convert hfile index members to KeyValues.
+   */
+  public static KeyValue createKeyValueFromKey(final byte[] b) {
+    return createKeyValueFromKey(b, 0, b.length);
+  }
+
+  /**
+   * @return A KeyValue made of a byte buffer that holds the key-only part.
+   *         Needed to convert hfile index members to KeyValues.
+   */
+  public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
+    return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
+  }
+
+  /**
+   * @return A KeyValue made of a byte array that holds the key-only part.
+   *         Needed to convert hfile index members to KeyValues.
+   */
+  public static KeyValue createKeyValueFromKey(final byte[] b, final int o, final int l) {
+    byte[] newb = new byte[l + KeyValue.ROW_OFFSET];
+    System.arraycopy(b, o, newb, KeyValue.ROW_OFFSET, l);
+    Bytes.putInt(newb, 0, l);
+    Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
+    return new KeyValue(newb);
+  }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
index 6df9ec3..872d91f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
@@ -24,7 +24,6 @@ import java.io.OutputStream;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 
 /**
@@ -66,7 +65,8 @@ public class KeyValueCodec implements Codec {
 
     @Override
     protected Cell parseCell() throws IOException {
-      return KeyValueUtil.iscreate(in, false);
+      // No tags here
+      return KeyValueUtil.createKeyValueFromInputStream(in, false);
     }
   }
 
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
index c241785..128e92e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
@@ -24,7 +24,6 @@ import java.io.OutputStream;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 
 /**
@@ -72,7 +71,8 @@ public class KeyValueCodecWithTags implements Codec {
 
     @Override
     protected Cell parseCell() throws IOException {
-      return KeyValueUtil.iscreate(in, true);
+      // create KeyValue with tags
+      return KeyValueUtil.createKeyValueFromInputStream(in, true);
     }
   }
 
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
index 7d3aa0e..0860636 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
@@ -18,12 +18,14 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotEquals;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -36,11 +38,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValue.MetaComparator;
-import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.assertNotEquals;
-
+@Category(SmallTests.class)
 public class TestKeyValue extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestKeyValue.class);
 
@@ -51,14 +55,14 @@ public class TestKeyValue extends TestCase {
     byte [] family2 = Bytes.toBytes("abcd");
     byte [] qualifier2 = Bytes.toBytes("ef");
 
-    KeyValue aaa = new KeyValue(a, family1, qualifier1, 0L, Type.Put, a);
+    KeyValue aaa = new KeyValue(a, family1, qualifier1, 0L, KeyValue.Type.Put, a);
     assertFalse(CellUtil.matchingColumn(aaa, family2, qualifier2));
     assertTrue(CellUtil.matchingColumn(aaa, family1, qualifier1));
-    aaa = new KeyValue(a, family2, qualifier2, 0L, Type.Put, a);
+    aaa = new KeyValue(a, family2, qualifier2, 0L, KeyValue.Type.Put, a);
     assertFalse(CellUtil.matchingColumn(aaa, family1, qualifier1));
     assertTrue(CellUtil.matchingColumn(aaa, family2,qualifier2));
     byte [] nullQualifier = new byte[0];
-    aaa = new KeyValue(a, family1, nullQualifier, 0L, Type.Put, a);
+    aaa = new KeyValue(a, family1, nullQualifier, 0L, KeyValue.Type.Put, a);
     assertTrue(CellUtil.matchingColumn(aaa, family1,null));
     assertFalse(CellUtil.matchingColumn(aaa, family2,qualifier2));
   }
@@ -74,7 +78,7 @@ public class TestKeyValue extends TestCase {
     byte [] family2 = Bytes.toBytes("ab");
     byte [] qualifier2 = Bytes.toBytes("def");
 
-    KeyValue aaa = new KeyValue(a, family1, qualifier1, 0L, Type.Put, a);
+    KeyValue aaa = new KeyValue(a, family1, qualifier1, 0L, KeyValue.Type.Put, a);
     assertFalse(CellUtil.matchingColumn(aaa, family2, qualifier2));
   }
 
@@ -96,7 +100,8 @@ public class TestKeyValue extends TestCase {
   private void check(final byte [] row, final byte [] family, byte [] qualifier,
     final long timestamp, final byte [] value) {
     KeyValue kv = new KeyValue(row, family, qualifier, timestamp, value);
-    assertTrue(Bytes.compareTo(kv.getRow(), row) == 0);
+    assertTrue(Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), row, 0,
+      row.length) == 0);
     assertTrue(CellUtil.matchingColumn(kv, family, qualifier));
     // Call toString to make sure it works.
     LOG.info(kv.toString());
@@ -257,13 +262,6 @@ public class TestKeyValue extends TestCase {
     }
   }
 
-  public void testStackedUpKeyValue() {
-    // Test multiple KeyValues in a single blob.
-
-    // TODO actually write this test!
-
-  }
-
   private final byte[] rowA = Bytes.toBytes("rowA");
   private final byte[] rowB = Bytes.toBytes("rowB");
 
@@ -282,18 +280,14 @@ public class TestKeyValue extends TestCase {
 
   private void assertKVLessWithoutRow(KeyValue.KVComparator c, int common, KeyValue less,
       KeyValue greater) {
-    int cmp = c.compareIgnoringPrefix(common, less.getBuffer(), less.getOffset()
-        + KeyValue.ROW_OFFSET, less.getKeyLength(), greater.getBuffer(),
-        greater.getOffset() + KeyValue.ROW_OFFSET, greater.getKeyLength());
+    int cmp = c.compare(less, greater);
     assertTrue(cmp < 0);
-    cmp = c.compareIgnoringPrefix(common, greater.getBuffer(), greater.getOffset()
-        + KeyValue.ROW_OFFSET, greater.getKeyLength(), less.getBuffer(),
-        less.getOffset() + KeyValue.ROW_OFFSET, less.getKeyLength());
+    cmp = c.compare(greater, less);
     assertTrue(cmp > 0);
   }
 
   public void testCompareWithoutRow() {
-    final KeyValue.KVComparator c = KeyValue.COMPARATOR;
+    final KVComparator c = new KeyValue.KVComparator();
     byte[] row = Bytes.toBytes("row");
 
     byte[] fa = Bytes.toBytes("fa");
@@ -306,15 +300,15 @@ public class TestKeyValue extends TestCase {
     long ts = 1;
 
     // 'fa:'
-    KeyValue kv_0 = new KeyValue(row, fa, qual0, ts, Type.Put);
+    KeyValue kv_0 = new KeyValue(row, fa, qual0, ts, KeyValue.Type.Put);
     // 'fami:'
-    KeyValue kv0_0 = new KeyValue(row, fami, qual0, ts, Type.Put);
+    KeyValue kv0_0 = new KeyValue(row, fami, qual0, ts, KeyValue.Type.Put);
     // 'fami:qf1'
-    KeyValue kv0_1 = new KeyValue(row, fami, qual1, ts, Type.Put);
+    KeyValue kv0_1 = new KeyValue(row, fami, qual1, ts, KeyValue.Type.Put);
     // 'fami:qf2'
-    KeyValue kv0_2 = new KeyValue(row, fami, qual2, ts, Type.Put);
+    KeyValue kv0_2 = new KeyValue(row, fami, qual2, ts, KeyValue.Type.Put);
     // 'fami1:'
-    KeyValue kv1_0 = new KeyValue(row, fami1, qual0, ts, Type.Put);
+    KeyValue kv1_0 = new KeyValue(row, fami1, qual0, ts, KeyValue.Type.Put);
 
     // 'fami:qf1' < 'fami:qf2'
     assertKVLessWithoutRow(c, 0, kv0_1, kv0_2);
@@ -340,7 +334,7 @@ public class TestKeyValue extends TestCase {
   }
 
   public void testFirstLastOnRow() {
-    final KVComparator c = KeyValue.COMPARATOR;
+    final KVComparator c = new KeyValue.KVComparator();
     long ts = 1;
     byte[] bufferA = new byte[128];
     int offsetA = 0;
@@ -352,14 +346,14 @@ public class TestKeyValue extends TestCase {
     final KeyValue firstOnRowA = KeyValueUtil.createFirstOnRow(rowA);
     final KeyValue firstOnRowABufferFamQual = KeyValueUtil.createFirstOnRow(bufferA, offsetA,
         rowA, 0, rowA.length, family, 0, family.length, qualA, 0, qualA.length);
-    final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put);
-    final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put);
+    final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, KeyValue.Type.Put);
+    final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, KeyValue.Type.Put);
 
     final KeyValue lastOnRowA = KeyValueUtil.createLastOnRow(rowA);
     final KeyValue firstOnRowB = KeyValueUtil.createFirstOnRow(rowB);
     final KeyValue firstOnRowBBufferFam = KeyValueUtil.createFirstOnRow(bufferB, offsetB,
         rowB, 0, rowB.length, family, 0, family.length, null, 0, 0);
-    final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put);
+    final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, KeyValue.Type.Put);
 
     assertKVLess(c, firstOnRowA, firstOnRowB);
     assertKVLess(c, firstOnRowA, firstOnRowBBufferFam);
@@ -397,9 +391,10 @@ public class TestKeyValue extends TestCase {
         // keys are still the same
         assertTrue(kv1.equals(kv1ko));
         // but values are not
-        assertTrue(kv1ko.getValue().length == (useLen?Bytes.SIZEOF_INT:0));
+        assertTrue(kv1ko.getValueLength() == (useLen?Bytes.SIZEOF_INT:0));
         if (useLen) {
-          assertEquals(kv1.getValueLength(), Bytes.toInt(kv1ko.getValue()));
+          assertEquals(kv1.getValueLength(),
+            Bytes.toInt(kv1ko.getValueArray(), kv1ko.getValueOffset(), kv1ko.getValueLength()));
         }
       }
     }
@@ -414,7 +409,7 @@ public class TestKeyValue extends TestCase {
     byte[] tmpArr = new byte[initialPadding + endingPadding + keyLen];
     System.arraycopy(kv.getBuffer(), kv.getKeyOffset(), tmpArr,
         initialPadding, keyLen);
-    KeyValue kvFromKey = KeyValue.createKeyValueFromKey(tmpArr, initialPadding,
+    KeyValue kvFromKey = KeyValueUtil.createKeyValueFromKey(tmpArr, initialPadding,
         keyLen);
     assertEquals(keyLen, kvFromKey.getKeyLength());
     assertEquals(KeyValue.ROW_OFFSET + keyLen, kvFromKey.getBuffer().length);
@@ -439,82 +434,6 @@ public class TestKeyValue extends TestCase {
     assertEquals(12345L, time2);
   }
 
-  /**
-   * See HBASE-7845
-   */
-  public void testGetShortMidpointKey() {
-    final KVComparator keyComparator = KeyValue.COMPARATOR;
-    //verify that faked shorter rowkey could be generated
-    long ts = 5;
-    KeyValue kv1 = new KeyValue(Bytes.toBytes("the quick brown fox"), family, qualA, ts, Type.Put);
-    KeyValue kv2 = new KeyValue(Bytes.toBytes("the who test text"), family, qualA, ts, Type.Put);
-    byte[] newKey = keyComparator.getShortMidpointKey(kv1.getKey(), kv2.getKey());
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), newKey) < 0);
-    assertTrue(keyComparator.compareFlatKey(newKey, kv2.getKey()) < 0);
-    short newRowLength = Bytes.toShort(newKey, 0);
-    byte[] expectedArray = Bytes.toBytes("the r");
-    Bytes.equals(newKey, KeyValue.ROW_LENGTH_SIZE, newRowLength, expectedArray, 0,
-      expectedArray.length);
-
-    //verify: same with "row + family + qualifier", return rightKey directly
-    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put);
-    kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 0, Type.Put);
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), kv2.getKey()) < 0);
-    newKey = keyComparator.getShortMidpointKey(kv1.getKey(), kv2.getKey());
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), newKey) < 0);
-    assertTrue(keyComparator.compareFlatKey(newKey, kv2.getKey()) == 0);
-    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -5, Type.Put);
-    kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, -10, Type.Put);
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), kv2.getKey()) < 0);
-    newKey = keyComparator.getShortMidpointKey(kv1.getKey(), kv2.getKey());
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), newKey) < 0);
-    assertTrue(keyComparator.compareFlatKey(newKey, kv2.getKey()) == 0);
-
-    // verify: same with row, different with qualifier
-    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, 5, Type.Put);
-    kv2 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualB, 5, Type.Put);
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), kv2.getKey()) < 0);
-    newKey = keyComparator.getShortMidpointKey(kv1.getKey(), kv2.getKey());
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), newKey) < 0);
-    assertTrue(keyComparator.compareFlatKey(newKey, kv2.getKey()) < 0);
-    KeyValue newKeyValue = KeyValue.createKeyValueFromKey(newKey);
-    assertTrue(Arrays.equals(newKeyValue.getFamily(),family));
-    assertTrue(Arrays.equals(newKeyValue.getQualifier(),qualB));
-    assertTrue(newKeyValue.getTimestamp() == HConstants.LATEST_TIMESTAMP);
-    assertTrue(newKeyValue.getTypeByte() == Type.Maximum.getCode());
-
-    //verify metaKeyComparator's getShortMidpointKey output
-    final KVComparator metaKeyComparator = KeyValue.META_COMPARATOR;
-    kv1 = new KeyValue(Bytes.toBytes("ilovehbase123"), family, qualA, 5, Type.Put);
-    kv2 = new KeyValue(Bytes.toBytes("ilovehbase234"), family, qualA, 0, Type.Put);
-    newKey = metaKeyComparator.getShortMidpointKey(kv1.getKey(), kv2.getKey());
-    assertTrue(metaKeyComparator.compareFlatKey(kv1.getKey(), newKey) < 0);
-    assertTrue(metaKeyComparator.compareFlatKey(newKey, kv2.getKey()) == 0);
-
-    //verify common fix scenario
-    kv1 = new KeyValue(Bytes.toBytes("ilovehbase"), family, qualA, ts, Type.Put);
-    kv2 = new KeyValue(Bytes.toBytes("ilovehbaseandhdfs"), family, qualA, ts, Type.Put);
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), kv2.getKey()) < 0);
-    newKey = keyComparator.getShortMidpointKey(kv1.getKey(), kv2.getKey());
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), newKey) < 0);
-    assertTrue(keyComparator.compareFlatKey(newKey, kv2.getKey()) < 0);
-    newRowLength = Bytes.toShort(newKey, 0);
-    expectedArray = Bytes.toBytes("ilovehbasea");
-    Bytes.equals(newKey, KeyValue.ROW_LENGTH_SIZE, newRowLength, expectedArray, 0,
-      expectedArray.length);
-    //verify only 1 offset scenario
-    kv1 = new KeyValue(Bytes.toBytes("100abcdefg"), family, qualA, ts, Type.Put);
-    kv2 = new KeyValue(Bytes.toBytes("101abcdefg"), family, qualA, ts, Type.Put);
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), kv2.getKey()) < 0);
-    newKey = keyComparator.getShortMidpointKey(kv1.getKey(), kv2.getKey());
-    assertTrue(keyComparator.compareFlatKey(kv1.getKey(), newKey) < 0);
-    assertTrue(keyComparator.compareFlatKey(newKey, kv2.getKey()) < 0);
-    newRowLength = Bytes.toShort(newKey, 0);
-    expectedArray = Bytes.toBytes("101");
-    Bytes.equals(newKey, KeyValue.ROW_LENGTH_SIZE, newRowLength, expectedArray, 0,
-      expectedArray.length);
-  }
-
   public void testKVsWithTags() {
     byte[] row = Bytes.toBytes("myRow");
     byte[] cf = Bytes.toBytes("myCF");
@@ -525,10 +444,14 @@ public class TestKeyValue extends TestCase {
     KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, new Tag[] {
         new Tag((byte) 1, metaValue1), new Tag((byte) 2, metaValue2) });
     assertTrue(kv.getTagsLength() > 0);
-    assertTrue(Bytes.equals(kv.getRow(), row));
-    assertTrue(Bytes.equals(kv.getFamily(), cf));
-    assertTrue(Bytes.equals(kv.getQualifier(), q));
-    assertTrue(Bytes.equals(kv.getValue(), value));
+    assertTrue(Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), row, 0,
+      row.length));
+    assertTrue(Bytes.equals(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), cf, 0,
+      cf.length));
+    assertTrue(Bytes.equals(kv.getQualifierArray(), kv.getQualifierOffset(),
+      kv.getQualifierLength(), q, 0, q.length));
+    assertTrue(Bytes.equals(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(), value, 0,
+      value.length));
     List<Tag> tags = kv.getTags();
     assertNotNull(tags);
     assertEquals(2, tags.size());
@@ -547,8 +470,7 @@ public class TestKeyValue extends TestCase {
     assertTrue(meta1Ok);
     assertTrue(meta2Ok);
     Iterator<Tag> tagItr = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(),
-        kv.getTagsLength());
-    //Iterator<Tag> tagItr = kv.tagsIterator();
+      kv.getTagsLength());
     assertTrue(tagItr.hasNext());
     Tag next = tagItr.next();
     assertEquals(10, next.getTagLength());
@@ -562,7 +484,7 @@ public class TestKeyValue extends TestCase {
     assertFalse(tagItr.hasNext());
 
     tagItr = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(),
-        kv.getTagsLength());
+      kv.getTagsLength());
     assertTrue(tagItr.hasNext());
     next = tagItr.next();
     assertEquals(10, next.getTagLength());
@@ -575,7 +497,7 @@ public class TestKeyValue extends TestCase {
     Bytes.equals(next.getValue(), metaValue2);
     assertFalse(tagItr.hasNext());
   }
-  
+
   public void testMetaKeyComparator() {
     MetaComparator c = new KeyValue.MetaComparator();
     long now = System.currentTimeMillis();
@@ -583,23 +505,23 @@ public class TestKeyValue extends TestCase {
     KeyValue a = new KeyValue(Bytes.toBytes("table1"), now);
     KeyValue b = new KeyValue(Bytes.toBytes("table2"), now);
     assertTrue(c.compare(a, b) < 0);
-    
+
     a = new KeyValue(Bytes.toBytes("table1,111"), now);
     b = new KeyValue(Bytes.toBytes("table2"), now);
     assertTrue(c.compare(a, b) < 0);
-    
+
     a = new KeyValue(Bytes.toBytes("table1"), now);
     b = new KeyValue(Bytes.toBytes("table2,111"), now);
     assertTrue(c.compare(a, b) < 0);
-    
+
     a = new KeyValue(Bytes.toBytes("table,111"), now);
     b = new KeyValue(Bytes.toBytes("table,2222"), now);
     assertTrue(c.compare(a, b) < 0);
-    
+
     a = new KeyValue(Bytes.toBytes("table,111,aaaa"), now);
     b = new KeyValue(Bytes.toBytes("table,2222"), now);
     assertTrue(c.compare(a, b) < 0);
-    
+
     a = new KeyValue(Bytes.toBytes("table,111"), now);
     b = new KeyValue(Bytes.toBytes("table,2222.bbb"), now);
     assertTrue(c.compare(a, b) < 0);
@@ -607,7 +529,7 @@ public class TestKeyValue extends TestCase {
     a = new KeyValue(Bytes.toBytes("table,,aaaa"), now);
     b = new KeyValue(Bytes.toBytes("table,111,bbb"), now);
     assertTrue(c.compare(a, b) < 0);
-    
+
     a = new KeyValue(Bytes.toBytes("table,111,aaaa"), now);
     b = new KeyValue(Bytes.toBytes("table,111,bbb"), now);
     assertTrue(c.compare(a, b) < 0);
@@ -615,238 +537,189 @@ public class TestKeyValue extends TestCase {
     a = new KeyValue(Bytes.toBytes("table,111,xxxx"), now);
     b = new KeyValue(Bytes.toBytes("table,111,222,bbb"), now);
     assertTrue(c.compare(a, b) < 0);
-    
+
     a = new KeyValue(Bytes.toBytes("table,111,11,xxx"), now);
     b = new KeyValue(Bytes.toBytes("table,111,222,bbb"), now);
     assertTrue(c.compare(a, b) < 0);
   }
 
-  public void testKeyValueSerialization() throws Exception {
-    KeyValue kvA1 = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"), Bytes.toBytes("qualA"),
-        Bytes.toBytes("1"));
-    KeyValue kvA2 = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"), Bytes.toBytes("qualA"),
-        Bytes.toBytes("2"));
-    MockKeyValue mkvA1 = new MockKeyValue(kvA1);
-    MockKeyValue mkvA2 = new MockKeyValue(kvA2);
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream os = new DataOutputStream(byteArrayOutputStream);
-    KeyValueUtil.oswrite(mkvA1, os, true);
-    KeyValueUtil.oswrite(mkvA2, os, true);
-    DataInputStream is = new DataInputStream(new ByteArrayInputStream(
-        byteArrayOutputStream.toByteArray()));
-    KeyValue deSerKV1 = KeyValue.iscreate(is);
-    assertTrue(kvA1.equals(deSerKV1));
-    KeyValue deSerKV2 = KeyValue.iscreate(is);
-    assertTrue(kvA2.equals(deSerKV2));
-  }
-
-  private static class MockKeyValue implements Cell {
-    private final KeyValue kv;
-
-    public MockKeyValue(KeyValue kv) {
-      this.kv = kv;
-    }
-
-    /**
-     * This returns the offset where the tag actually starts.
-     */
-    @Override
-    public int getTagsOffset() {
-      return this.kv.getTagsOffset();
-    }
-
-    // used to achieve atomic operations in the memstore.
-    @Override
-    public long getMvccVersion() {
-      return this.kv.getMvccVersion();
-    }
-
-    /**
-     * used to achieve atomic operations in the memstore.
-     */
-    @Override
-    public long getSequenceId() {
-      return this.kv.getSequenceId();
-    }
-
-    /**
-     * This returns the total length of the tag bytes
-     */
-    @Override
-    public int getTagsLength() {
-      return this.kv.getTagsLength();
-    }
-
-    /**
-     * 
-     * @return Timestamp
-     */
-    @Override
-    public long getTimestamp() {
-      return this.kv.getTimestamp();
-    }
-
-    /**
-     * @return KeyValue.TYPE byte representation
-     */
-    @Override
-    public byte getTypeByte() {
-      return this.kv.getTypeByte();
-    }
-
-    /**
-     * @return the backing array of the entire KeyValue (all KeyValue fields are
-     *         in a single array)
-     */
-    @Override
-    public byte[] getValueArray() {
-      return this.kv.getValueArray();
-    }
-
-    /**
-     * @return the value offset
-     */
-    @Override
-    public int getValueOffset() {
-      return this.kv.getValueOffset();
-    }
-
-    /**
-     * @return Value length
-     */
-    @Override
-    public int getValueLength() {
-      return this.kv.getValueLength();
-    }
-
-    /**
-     * @return the backing array of the entire KeyValue (all KeyValue fields are
-     *         in a single array)
-     */
-    @Override
-    public byte[] getRowArray() {
-      return this.kv.getRowArray();
-    }
-
-    /**
-     * @return Row offset
-     */
-    @Override
-    public int getRowOffset() {
-      return this.kv.getRowOffset();
-    }
-
-    /**
-     * @return Row length
-     */
-    @Override
-    public short getRowLength() {
-      return this.kv.getRowLength();
-    }
+  public void testEqualsAndHashCode() throws Exception {
+    KeyValue kvA1 = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"),
+        Bytes.toBytes("qualA"), Bytes.toBytes("1"));
+    KeyValue kvA2 = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"),
+        Bytes.toBytes("qualA"), Bytes.toBytes("2"));
+    // We set a different sequence id on kvA2 to demonstrate that the equals and hashCode also
+    // don't take this into account.
+    kvA2.setSequenceId(2);
+    KeyValue kvB = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"),
+        Bytes.toBytes("qualB"), Bytes.toBytes("1"));
 
-    /**
-     * @return the backing array of the entire KeyValue (all KeyValue fields are
-     *         in a single array)
-     */
-    @Override
-    public byte[] getFamilyArray() {
-      return this.kv.getFamilyArray();
-    }
+    assertEquals(kvA1, kvA2);
+    assertNotEquals(kvA1, kvB);
+    assertEquals(kvA1.hashCode(), kvA2.hashCode());
+    assertNotEquals(kvA1.hashCode(), kvB.hashCode());
 
-    /**
-     * @return Family offset
-     */
-    @Override
-    public int getFamilyOffset() {
-      return this.kv.getFamilyOffset();
-    }
+  }
 
-    /**
-     * @return Family length
-     */
-    @Override
-    public byte getFamilyLength() {
-      return this.kv.getFamilyLength();
+  public void testKeyValueSerialization() throws Exception {
+    KeyValue[] keyValues = new KeyValue[] {
+      new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"), Bytes.toBytes("qualA"),
+        Bytes.toBytes("1")),
+      new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"), Bytes.toBytes("qualA"),
+        Bytes.toBytes("2")),
+      new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"), Bytes.toBytes("qualA"),
+        System.currentTimeMillis(), Bytes.toBytes("2"),
+        new Tag[] { new Tag((byte) 120, "tagA"),
+        new Tag((byte) 121, Bytes.toBytes("tagB")) }),
+      new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"), Bytes.toBytes("qualA"),
+        System.currentTimeMillis(), Bytes.toBytes("2"),
+        new Tag[] { new Tag((byte) 0, "tagA") }),
+      new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"), Bytes.toBytes(""),
+        Bytes.toBytes("1")) };
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    for (KeyValue kv : keyValues) {
+      DataOutputStream os = new DataOutputStream(byteArrayOutputStream);
+      KeyValueUtil.oswrite(kv, os, true);
     }
-
-    /**
-     * @return the backing array of the entire KeyValue (all KeyValue fields are
-     *         in a single array)
-     */
-    @Override
-    public byte[] getQualifierArray() {
-      return this.kv.getQualifierArray();
+    DataInputStream is =
+        new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+    for (int i = 0; i < keyValues.length; i++) {
+      LOG.info("Case#" + i + ": deserialize the kv: " + keyValues[i]);
+      KeyValue destKv = KeyValueUtil.createKeyValueFromInputStream(is, true);
+      assertEquals(keyValues[i], destKv);
+      assertArrayEquals(CellUtil.cloneValue(keyValues[i]), CellUtil.cloneValue(destKv));
+      assertArrayEquals(CellUtil.cloneTags(keyValues[i]), CellUtil.cloneTags(destKv));
     }
+  }
 
-    /**
-     * @return Qualifier offset
-     */
-    @Override
-    public int getQualifierOffset() {
-      return this.kv.getQualifierOffset();
+  @Test
+  public void testNullByteArrayKeyValueFailure() {
+    //can't add to testCheckKeyValueBytesFailureCase because it
+    //goes through the InputStream KeyValue API which can't produce a null buffer
+    try {
+      KeyValue kv = new KeyValue(null, 0, 0);
+    } catch (IllegalArgumentException iae){
+      assertEquals("Invalid to have null byte array in KeyValue.", iae.getMessage());
+      return;
     }
+    fail("Should have thrown an IllegalArgumentException when " +
+        "creating a KeyValue with a null buffer");
+  }
 
-    /**
-     * @return Qualifier length
-     */
-    @Override
-    public int getQualifierLength() {
-      return this.kv.getQualifierLength();
+  private static class FailureCase {
+    byte[] buf;
+    int offset;
+    int length;
+    boolean withTags;
+    String expectedMessage;
+
+    public FailureCase(byte[] buf, int offset, int length, boolean withTags,
+        String expectedMessage) {
+      this.buf = buf;
+      this.offset = offset;
+      this.length = length;
+      this.withTags = withTags;
+      this.expectedMessage = expectedMessage;
     }
 
     @Override
-    @Deprecated
-    public byte[] getValue() {
-      // TODO Auto-generated method stub
-      return null;
+    public String toString() {
+      return "FailureCaseDetails: [buf=" + Bytes.toStringBinary(buf, offset, length) + ", offset="
+          + offset + ", " + "length=" + length + ", expectedMessage=" + expectedMessage
+          + ", withtags=" + withTags + "]";
     }
 
-    @Override
-    @Deprecated
-    public byte[] getFamily() {
-      // TODO Auto-generated method stub
-      return null;
+    public String getExpectedMessage() {
+      return this.expectedMessage + KeyValueUtil.bytesToHex(buf, offset, length);
     }
+  }
 
-    @Override
-    @Deprecated
-    public byte[] getQualifier() {
-      // TODO Auto-generated method stub
-      return null;
+  @Test
+  public void testCheckKeyValueBytesFailureCase() throws Exception {
+    byte[][] inputs = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, // case.0
+        Bytes.toBytesBinary("a"), // case.1
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x01"), // case.2
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x01\\x00"), // case.3
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x01"), // case.4
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x01\\x00"), // case.5
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x01\\x00\\x01"), // case.6
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x01\\x00\\x03ROW"), // case.7
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01"), // case.8
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\xFF"
+            + "\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\x03"), // case.9
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+            + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x03"), // case.10
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+            + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04"), // case.11
+        Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+            + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04VALUE"), // case.12
+    };
+    String[] outputs = new String[] { "Overflow when reading key length at position=0",
+      "Overflow when reading key length at position=0",
+      "Invalid key length in KeyValue. keyLength=1",
+      "Overflow when reading value length at position=4",
+      "Invalid value length in KeyValue, valueLength=1",
+      "Overflow when reading row length at position=8",
+      "Invalid row length in KeyValue, rowLength=1",
+      "Overflow when reading family length at position=13",
+      "Invalid family length in KeyValue, familyLength=1", "Timestamp cannot be negative, ts=-1",
+      "Invalid type in KeyValue, type=3", "Overflow when reading value part at position=25",
+      "Invalid tags length in KeyValue at position=26"};
+    byte[][] withTagsInputs = new byte[][] {
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x01"), // case.13
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x00\\x01"), // case.14
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x00\\x04\\x00\\x03\\x00A"), // case.15
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x00\\x0A\\x00\\x04\\x00TAG\\x00\\x04"
+        + "\\xFFT"), // case.16
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x00\\x0C\\x00\\x04\\x00TAG\\x00\\x05"
+        + "\\xF0COME\\x00"), // case.17
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x00\\x0C\\x00\\x04\\x00TAG\\x00\\x05"
+        + "\\xF0COME"), // case.18
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x00\\x00"), // case.19
+      Bytes.toBytesBinary("\\x00\\x00\\x00\\x11\\x00\\x00\\x00\\x01\\x00\\x03ROW\\x01FQ\\x00"
+        + "\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x04V\\x00\\x1B\\x00\\x05\\x01TAG1\\x00\\x05"
+        + "\\x02TAG2\\x00\\x05\\x03TAG3\\x00\\x05\\x04TAG4"), // case.20
+    };
+    String[] withTagsOutputs = new String[] { "Overflow when reading tags length at position=26",
+      "Invalid tags length in KeyValue at position=26",
+      "Invalid tag length at position=28, tagLength=3",
+      "Invalid tag length at position=34, tagLength=4",
+      "Some redundant bytes in KeyValue's buffer, startOffset=41, endOffset=42", null, null,
+      null, };
+    assertEquals(inputs.length, outputs.length);
+    assertEquals(withTagsInputs.length, withTagsOutputs.length);
+
+    FailureCase[] cases = new FailureCase[inputs.length + withTagsInputs.length];
+    for (int i = 0; i < inputs.length; i++) {
+      cases[i] = new FailureCase(inputs[i], 0, inputs[i].length, false, outputs[i]);
     }
-
-    @Override
-    @Deprecated
-    public byte[] getRow() {
-      // TODO Auto-generated method stub
-      return null;
+    for (int i = 0; i < withTagsInputs.length; i++) {
+      cases[inputs.length + i] =
+          new FailureCase(withTagsInputs[i], 0, withTagsInputs[i].length, true, withTagsOutputs[i]);
     }
 
-    /**
-     * @return the backing array of the entire KeyValue (all KeyValue fields are
-     *         in a single array)
-     */
-    @Override
-    public byte[] getTagsArray() {
-      return this.kv.getTagsArray();
+    for (int i = 0; i < cases.length; i++) {
+      FailureCase c = cases[i];
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream os = new DataOutputStream(baos);
+      ByteBufferUtils.putInt(os, c.length);
+      os.write(c.buf, c.offset, c.length);
+      try {
+        KeyValueUtil.createKeyValueFromInputStream(
+          new DataInputStream(new ByteArrayInputStream(baos.toByteArray())), c.withTags);
+        if (c.expectedMessage != null) {
+          fail("Should fail when parse key value from an invalid bytes for case#" + i + ". " + c);
+        }
+      } catch (IllegalArgumentException e) {
+        assertEquals("Case#" + i + " failed," + c, c.getExpectedMessage(), e.getMessage());
+      }
     }
   }
-
-  public void testEqualsAndHashCode() throws Exception {
-    KeyValue kvA1 = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"),
-        Bytes.toBytes("qualA"), Bytes.toBytes("1"));
-    KeyValue kvA2 = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"),
-        Bytes.toBytes("qualA"), Bytes.toBytes("2"));
-    // We set a different sequence id on kvA2 to demonstrate that the equals and hashCode also
-    // don't take this into account.
-    kvA2.setSequenceId(2);
-    KeyValue kvB = new KeyValue(Bytes.toBytes("key"), Bytes.toBytes("cf"),
-        Bytes.toBytes("qualB"), Bytes.toBytes("1"));
-
-    assertEquals(kvA1, kvA2);
-    assertNotEquals(kvA1, kvB);
-    assertEquals(kvA1.hashCode(), kvA2.hashCode());
-    assertNotEquals(kvA1.hashCode(), kvB.hashCode());
-  }
-
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
index da397f1..70e87d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
@@ -134,34 +134,6 @@ public class TestDataBlockEncoders {
   }
 
   /**
-   * Test KeyValues with negative timestamp.
-   *
-   * @throws IOException
-   *           On test failure.
-   */
-  @Test
-  public void testNegativeTimestamps() throws IOException {
-    List<KeyValue> kvList = new ArrayList<KeyValue>();
-    byte[] row = new byte[0];
-    byte[] family = new byte[0];
-    byte[] qualifier = new byte[0];
-    byte[] value = new byte[0];
-    if (includesTags) {
-      byte[] metaValue1 = Bytes.toBytes("metaValue1");
-      byte[] metaValue2 = Bytes.toBytes("metaValue2");
-      kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
-          metaValue1) }));
-      kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
-          metaValue2) }));
-    } else {
-      kvList.add(new KeyValue(row, family, qualifier, -1l, Type.Put, value));
-      kvList.add(new KeyValue(row, family, qualifier, -2l, Type.Put, value));
-    }
-    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
-  }
-
-
-  /**
    * Test whether compression -> decompression gives the consistent results on
    * pseudorandom sample.
    * @throws IOException On test failure.
@@ -441,7 +413,7 @@ public class TestDataBlockEncoders {
     byte[] family = new byte[0];
     byte[] qualifier = new byte[0];
     byte[] value = new byte[0];
-    KeyValue expectedKV = new KeyValue(row, family, qualifier, -1L, Type.Put, value);
+    KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value);
     kvList.add(expectedKV);
     DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1;
     DataBlockEncoder encoder = encoding.getEncoder();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index c72c039..7448ddd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -368,11 +368,11 @@ public class TestCacheOnWrite {
         tags[0] = t;
         kv =
             new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
-                rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList);
+                Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList);
       } else {
         kv =
             new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
-                rand.nextLong(), generateKeyType(rand), value, 0, value.length);
+                Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length);
       }
       sfw.append(kv);
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
index e502eb6..3734f20 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
@@ -192,15 +192,15 @@ public class TestHFileSeek extends TestCase {
     timer.start();
     for (int i = 0; i < options.seekCount; ++i) {
       kSampler.next(key);
-      byte [] k = new byte [key.getLength()];
+      byte[] k = new byte[key.getLength()];
       System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
-      if (scanner.seekTo(KeyValue.createKeyValueFromKey(k)) >= 0) {
+      KeyValue kv = new KeyValue(k, CF, QUAL);
+      if (scanner.seekTo(kv) >= 0) {
         ByteBuffer bbkey = scanner.getKey();
         ByteBuffer bbval = scanner.getValue();
         totalBytes += bbkey.limit();
         totalBytes += bbval.limit();
-      }
-      else {
+      } else {
         ++miss;
       }
     }