You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/01/12 07:11:56 UTC

[12/12] hbase git commit: HBASE-15077 Support OffheapKV write in compaction with out copying data on heap.

HBASE-15077 Support OffheapKV write in compaction with out copying data on heap.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da932ee3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da932ee3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da932ee3

Branch: refs/heads/hbase-12439
Commit: da932ee38d29a13acb3e3ff6653402d4c8141d04
Parents: ec47a81
Author: anoopsjohn <an...@gmail.com>
Authored: Tue Jan 12 10:02:39 2016 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Tue Jan 12 10:02:39 2016 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hbase/OffheapKeyValue.java    |   2 +-
 .../hadoop/hbase/io/ByteArrayOutputStream.java  | 129 +++++++++++++++++++
 .../hadoop/hbase/io/ByteBufferOutputStream.java |   3 +-
 .../io/ByteBufferSupportDataOutputStream.java   |  44 +++++++
 .../hbase/io/ByteBufferSupportOutputStream.java |  51 ++++++++
 .../io/encoding/BufferedDataBlockEncoder.java   |   4 +-
 .../hadoop/hbase/util/ByteBufferUtils.java      |  26 +---
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  23 +---
 .../io/encoding/TestDataBlockEncoders.java      |  11 +-
 .../io/hfile/TestHFileDataBlockEncoder.java     |   7 +-
 10 files changed, 245 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
index 0af64cd..ced1595 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
@@ -254,7 +254,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements HeapSize, Clone
       length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
     }
     ByteBufferUtils.putInt(out, length);
-    ByteBufferUtils.writeByteBuffer(out, this.buf, this.offset, length);
+    ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length);
     return length + Bytes.SIZEOF_INT;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
new file mode 100644
index 0000000..d951595
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Our own implementation of ByteArrayOutputStream where all methods are NOT synchronized and
+ * supports writing ByteBuffer directly to it.
+ */
+@InterfaceAudience.Private
+public class ByteArrayOutputStream extends OutputStream implements ByteBufferSupportOutputStream {
+
+  // Borrowed from openJDK:
+  // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
+  private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+  private byte[] buf;
+  private int pos = 0;
+
+  public ByteArrayOutputStream() {
+    this(32);
+  }
+
+  public ByteArrayOutputStream(int capacity) {
+    this.buf = new byte[capacity];
+  }
+
+  @Override
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    checkSizeAndGrow(len);
+    ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len);
+    this.pos += len;
+  }
+
+  @Override
+  public void writeInt(int i) throws IOException {
+    checkSizeAndGrow(Bytes.SIZEOF_INT);
+    Bytes.putInt(this.buf, this.pos, i);
+    this.pos += Bytes.SIZEOF_INT;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    checkSizeAndGrow(Bytes.SIZEOF_BYTE);
+    buf[this.pos] = (byte) b;
+    this.pos++;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkSizeAndGrow(len);
+    System.arraycopy(b, off, this.buf, this.pos, len);
+    this.pos += len;
+  }
+
+  private void checkSizeAndGrow(int extra) {
+    long capacityNeeded = this.pos + (long) extra;
+    if (capacityNeeded > this.buf.length) {
+      // guarantee it's possible to fit
+      if (capacityNeeded > MAX_ARRAY_SIZE) {
+        throw new BufferOverflowException();
+      }
+      // double until hit the cap
+      long nextCapacity = Math.min(this.buf.length << 1, MAX_ARRAY_SIZE);
+      // but make sure there is enough if twice the existing capacity is still too small
+      nextCapacity = Math.max(nextCapacity, capacityNeeded);
+      if (nextCapacity > MAX_ARRAY_SIZE) {
+        throw new BufferOverflowException();
+      }
+      byte[] newBuf = new byte[(int) nextCapacity];
+      System.arraycopy(buf, 0, newBuf, 0, buf.length);
+      buf = newBuf;
+    }
+  }
+
+  /**
+   * Resets the <code>pos</code> field of this byte array output stream to zero. The output stream
+   * can be used again.
+   */
+  public void reset() {
+    this.pos = 0;
+  }
+
+  /**
+   * Copies the content of this Stream into a new byte array.
+   * @return  the contents of this output stream, as new byte array.
+   */
+  public byte toByteArray()[] {
+    return Arrays.copyOf(buf, pos);
+  }
+
+  /**
+   * @return the underlying array where the data gets accumulated
+   */
+  public byte[] getBuffer() {
+    return this.buf;
+  }
+
+  /**
+   * @return The current size of the buffer.
+   */
+  public int size() {
+    return this.pos;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index d91513e..d4bda18 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -37,7 +37,8 @@ import org.apache.hadoop.hbase.util.Bytes;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class ByteBufferOutputStream extends OutputStream {
+public class ByteBufferOutputStream extends OutputStream
+    implements ByteBufferSupportOutputStream {
   
   // Borrowed from openJDK:
   // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
new file mode 100644
index 0000000..3a52e63
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * Our extension of DataOutputStream which implements ByteBufferSupportOutputStream
+ */
+@InterfaceAudience.Private
+public class ByteBufferSupportDataOutputStream extends DataOutputStream
+    implements ByteBufferSupportOutputStream {
+
+  public ByteBufferSupportDataOutputStream(OutputStream out) {
+    super(out);
+  }
+
+  @Override
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    ByteBufferUtils.copyBufferToStream(out, b, off, len);
+    written += len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
new file mode 100644
index 0000000..ccb5c81
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface adds support for writing {@link ByteBuffer} into OutputStream.
+ */
+@InterfaceAudience.Private
+public interface ByteBufferSupportOutputStream {
+
+  /**
+   * Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
+   * to this output stream.
+   *
+   * @param b the data.
+   * @param off the start offset in the data.
+   * @param len the number of bytes to write.
+   * @exception IOException
+   *              if an I/O error occurs. In particular, an <code>IOException</code> is thrown if
+   *              the output stream is closed.
+   */
+  void write(ByteBuffer b, int off, int len) throws IOException;
+
+  /**
+   * Writes an <code>int</code> to the underlying output stream as four
+   * bytes, high byte first.
+   * @param i the <code>int</code> to write
+   * @throws IOException if an I/O error occurs.
+   */
+  void writeInt(int i) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 112f258..33e38c7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -673,14 +673,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
       // Write key
       out.write(keyBuffer.array());
       // Write value
-      ByteBufferUtils.writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength);
+      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
       if (withTags) {
         // 2 bytes tags length followed by tags bytes
         // tags length is serialized with 2 bytes only(short way) even if the type is int.
         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
         out.write((byte) (0xff & (this.tagsLength >> 8)));
         out.write((byte) (0xff & this.tagsLength));
-        ByteBufferUtils.writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
+        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
       }
       return lenToWrite + Bytes.SIZEOF_INT;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 6e3fcaa..62173c2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
@@ -141,8 +141,8 @@ public final class ByteBufferUtils {
      // We have writeInt in ByteBufferOutputStream so that it can directly write
      // int to underlying
      // ByteBuffer in one step.
-     if (out instanceof ByteBufferOutputStream) {
-       ((ByteBufferOutputStream) out).writeInt(value);
+     if (out instanceof ByteBufferSupportOutputStream) {
+       ((ByteBufferSupportOutputStream) out).writeInt(value);
      } else {
        StreamUtils.writeInt(out, value);
      }
@@ -179,9 +179,10 @@ public final class ByteBufferUtils {
    */
   public static void copyBufferToStream(OutputStream out, ByteBuffer in,
       int offset, int length) throws IOException {
-    if (in.hasArray()) {
-      out.write(in.array(), in.arrayOffset() + offset,
-          length);
+    if (out instanceof ByteBufferSupportOutputStream) {
+      ((ByteBufferSupportOutputStream) out).write(in, offset, length);
+    } else if (in.hasArray()) {
+      out.write(in.array(), in.arrayOffset() + offset, length);
     } else {
       for (int i = 0; i < length; ++i) {
         out.write(toByte(in, offset + i));
@@ -904,19 +905,6 @@ public final class ByteBufferUtils {
     }
   }
 
-  public static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
-      throws IOException {
-    // We have write which takes ByteBuffer in ByteBufferOutputStream so that it
-    // can directly write
-    // bytes from the src ByteBuffer to the destination ByteBuffer. This avoid
-    // need for temp array
-    // creation and copy
-    if (out instanceof ByteBufferOutputStream) {
-      ((ByteBufferOutputStream) out).write(b, offset, length);
-    } else {
-      ByteBufferUtils.copyBufferToStream(out, b, offset, length);
-    }
-  }
   // For testing purpose
   public static String toStringBinary(final ByteBuffer b, int off, int len) {
     StringBuilder result = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index a68d0a6..6916ba0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -35,7 +34,9 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
@@ -915,7 +916,7 @@ public class HFileBlock implements Cacheable {
       state = State.WRITING;
 
       // We will compress it later in finishBlock()
-      userDataStream = new DataOutputStream(baosInMemory);
+      userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory);
       if (newBlockType == BlockType.DATA) {
         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
       }
@@ -969,11 +970,8 @@ public class HFileBlock implements Cacheable {
      */
     private void finishBlock() throws IOException {
       if (blockType == BlockType.DATA) {
-        BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
-            new BufferGrabbingByteArrayOutputStream();
-        baosInMemory.writeTo(baosInMemoryCopy);
         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
-            baosInMemoryCopy.buf, blockType);
+            baosInMemory.getBuffer(), blockType);
         blockType = dataBlockEncodingCtx.getBlockType();
       }
       userDataStream.flush();
@@ -1011,19 +1009,6 @@ public class HFileBlock implements Cacheable {
           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
     }
 
-    public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
-      private byte[] buf;
-
-      @Override
-      public void write(byte[] b, int off, int len) {
-        this.buf = b;
-      }
-
-      public byte[] getBuffer() {
-        return this.buf;
-      }
-    }
-
     /**
      * Put the header into the given byte array at the given offset.
      * @param onDiskSize size of the block on disk header + data + checksum

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
index 00969b2..1ef918c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -45,8 +44,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeSeeker;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
@@ -256,9 +255,7 @@ public class TestDataBlockEncoders {
     for (KeyValue kv : kvs) {
       encoder.encode(kv, encodingContext, dos);
     }
-    BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
-    baos.writeTo(stream);
-    encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
+    encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
     byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
     System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
     if (useOffheapData) {
@@ -398,9 +395,7 @@ public class TestDataBlockEncoders {
       for (KeyValue kv : kvList) {
         encoder.encode(kv, encodingContext, dos);
       }
-      BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
-      baos.writeTo(stream);
-      encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
+      encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
       byte[] encodedData = baos.toByteArray();
 
       testAlgorithm(encodedData, unencodedDataBuf, encoder);

http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
index 2523a8c..6f434bb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -31,12 +30,12 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
-import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ChecksumType;
@@ -217,9 +216,7 @@ public class TestHFileDataBlockEncoder {
     for (KeyValue kv : kvs) {
       blockEncoder.encode(kv, context, dos);
     }
-    BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
-    baos.writeTo(stream);
-    blockEncoder.endBlockEncoding(context, dos, stream.getBuffer(), BlockType.DATA);
+    blockEncoder.endBlockEncoding(context, dos, baos.getBuffer(), BlockType.DATA);
     byte[] encodedBytes = baos.toByteArray();
     size = encodedBytes.length - block.getDummyHeaderForVersion().length;
     return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),