You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/08/31 05:15:03 UTC

hbase git commit: HBASE-16531 Move cell block related code out of IPCUtil

Repository: hbase
Updated Branches:
  refs/heads/master ea1552270 -> 647a65ce0


HBASE-16531 Move cell block related code out of IPCUtil


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/647a65ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/647a65ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/647a65ce

Branch: refs/heads/master
Commit: 647a65ce01d695bd181c8c5029c4ddd63cab9692
Parents: ea15522
Author: zhangduo <zh...@apache.org>
Authored: Tue Aug 30 18:26:42 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Aug 31 13:09:22 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AbstractRpcClient.java     |  34 +--
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java |   8 +-
 .../hadoop/hbase/ipc/CellBlockBuilder.java      | 229 ++++++++++++++++
 .../ipc/CellScannerButNoCodecException.java     |  31 +++
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    | 272 +++++--------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  33 +--
 .../hadoop/hbase/ipc/TestCellBlockBuilder.java  | 196 +++++++++++++
 .../apache/hadoop/hbase/ipc/TestIPCUtil.java    | 185 ++-----------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  10 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |  18 --
 10 files changed, 549 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 3d3339a..4d0d16b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -26,10 +26,8 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 
 import org.apache.commons.logging.Log;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -64,7 +61,7 @@ public abstract class AbstractRpcClient implements RpcClient {
   protected final MetricsConnection metrics;
 
   protected UserProvider userProvider;
-  protected final IPCUtil ipcUtil;
+  protected final CellBlockBuilder cellBlockBuilder;
 
   protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
   // time (in ms), it will be closed at any moment.
@@ -98,7 +95,7 @@ public abstract class AbstractRpcClient implements RpcClient {
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
     this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
-    this.ipcUtil = new IPCUtil(conf);
+    this.cellBlockBuilder = new CellBlockBuilder(conf);
 
     this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
     this.conf = conf;
@@ -293,33 +290,6 @@ public abstract class AbstractRpcClient implements RpcClient {
   }
 
   /**
-   * Takes an Exception and the address we were trying to connect to and return an IOException with
-   * the input exception as the cause. The new exception provides the stack trace of the place where
-   * the exception is thrown and some extra diagnostics information. If the exception is
-   * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
-   * an IOException.
-   * @param addr target address
-   * @param exception the relevant exception
-   * @return an exception to throw
-   */
-  protected IOException wrapException(InetSocketAddress addr, Exception exception) {
-    if (exception instanceof ConnectException) {
-      // connection refused; include the host:port in the error
-      return (ConnectException) new ConnectException("Call to " + addr
-          + " failed on connection exception: " + exception).initCause(exception);
-    } else if (exception instanceof SocketTimeoutException) {
-      return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
-          + " failed because " + exception).initCause(exception);
-    } else if (exception instanceof ConnectionClosingException) {
-      return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
-          + " failed on local exception: " + exception).initCause(exception);
-    } else {
-      return (IOException) new IOException("Call to " + addr + " failed on local exception: "
-          + exception).initCause(exception);
-    }
-  }
-
-  /**
    * Blocking rpc channel that goes via hbase rpc.
    */
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 3d343b4..e368c43 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -260,11 +260,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
       if (e.getCause() instanceof IOException) {
         throw (IOException) e.getCause();
       } else {
-        throw wrapException(addr, (Exception) e.getCause());
+        throw IPCUtil.wrapException(addr, (Exception) e.getCause());
       }
     } catch (TimeoutException e) {
       CallTimeoutException cte = new CallTimeoutException(promise.toString());
-      throw wrapException(addr, cte);
+      throw IPCUtil.wrapException(addr, cte);
     }
   }
 
@@ -359,7 +359,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
    * @throws java.io.IOException on error on creation cell scanner
    */
   public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
-    return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
+    return cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
   }
 
   /**
@@ -370,7 +370,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
    * @throws java.io.IOException if block creation fails
    */
   public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
-    return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
+    return cellBlockBuilder.buildCellBlock(this.codec, this.compressor, cells);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
new file mode 100644
index 0000000..072a490
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Helper class for building cell block.
+ */
+@InterfaceAudience.Private
+public class CellBlockBuilder {
+
+  // LOG is being used in TestCellBlockBuilder
+  static final Log LOG = LogFactory.getLog(CellBlockBuilder.class);
+
+  private final Configuration conf;
+  /**
+   * How much we think the decompressor will expand the original compressed content.
+   */
+  private final int cellBlockDecompressionMultiplier;
+
+  private final int cellBlockBuildingInitialBufferSize;
+
+  public CellBlockBuilder(final Configuration conf) {
+    this.conf = conf;
+    this.cellBlockDecompressionMultiplier = conf
+        .getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+
+    // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
+    // #buildCellBlock.
+    this.cellBlockBuildingInitialBufferSize = ClassSize
+        .align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
+  }
+
+  /**
+   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+   * <code>compressor</code>.
+   * @param codec to use for encoding
+   * @param compressor to use for encoding
+   * @param cellScanner to encode
+   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+   *         been flipped and is ready for reading. Use limit to find total size.
+   * @throws IOException if encoding the cells fail
+   */
+  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+      final CellScanner cellScanner) throws IOException {
+    if (cellScanner == null) {
+      return null;
+    }
+    if (codec == null) {
+      throw new CellScannerButNoCodecException();
+    }
+    int bufferSize = this.cellBlockBuildingInitialBufferSize;
+    ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+    encodeCellsTo(baos, cellScanner, codec, compressor);
+    if (LOG.isTraceEnabled()) {
+      if (bufferSize < baos.size()) {
+        LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
+            + "; up hbase.ipc.cellblock.building.initial.buffersize?");
+      }
+    }
+    ByteBuffer bb = baos.getByteBuffer();
+    // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+    // gets or something -- stuff that does not return a cell).
+    if (!bb.hasRemaining()) return null;
+    return bb;
+  }
+
+  private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
+      CompressionCodec compressor) throws IOException {
+    OutputStream os = bbos;
+    Compressor poolCompressor = null;
+    try {
+      if (compressor != null) {
+        if (compressor instanceof Configurable) {
+          ((Configurable) compressor).setConf(this.conf);
+        }
+        poolCompressor = CodecPool.getCompressor(compressor);
+        os = compressor.createOutputStream(os, poolCompressor);
+      }
+      Codec.Encoder encoder = codec.getEncoder(os);
+      while (cellScanner.advance()) {
+        encoder.write(cellScanner.current());
+      }
+      encoder.flush();
+    } catch (BufferOverflowException e) {
+      throw new DoNotRetryIOException(e);
+    } finally {
+      os.close();
+      if (poolCompressor != null) {
+        CodecPool.returnCompressor(poolCompressor);
+      }
+    }
+  }
+
+  /**
+   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+   * <code>compressor</code>.
+   * @param codec to use for encoding
+   * @param compressor to use for encoding
+   * @param cellScanner to encode
+   * @param pool Pool of ByteBuffers to make use of.
+   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+   *         been flipped and is ready for reading. Use limit to find total size. If
+   *         <code>pool</code> was not null, then this returned ByteBuffer came from there and
+   *         should be returned to the pool when done.
+   * @throws IOException if encoding the cells fail
+   */
+  public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
+      CellScanner cellScanner, ByteBufferPool pool) throws IOException {
+    if (cellScanner == null) {
+      return null;
+    }
+    if (codec == null) {
+      throw new CellScannerButNoCodecException();
+    }
+    assert pool != null;
+    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+    encodeCellsTo(bbos, cellScanner, codec, compressor);
+    if (bbos.size() == 0) {
+      bbos.releaseResources();
+      return null;
+    }
+    return bbos;
+  }
+
+  /**
+   * @param codec to use for cellblock
+   * @param cellBlock to encode
+   * @return CellScanner to work against the content of <code>cellBlock</code>
+   * @throws IOException if encoding fails
+   */
+  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+      final byte[] cellBlock) throws IOException {
+    // Use this method from Client side to create the CellScanner
+    ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
+    if (compressor != null) {
+      cellBlockBuf = decompress(compressor, cellBlockBuf);
+    }
+    // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
+    // make Cells directly over the passed BB. This method is called at client side and we don't
+    // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
+    // of the Cells at user's app level will make it not possible to GC the response byte[]
+    return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
+  }
+
+  /**
+   * @param codec to use for cellblock
+   * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
+   *          position()'ed at the start of the cell block and limit()'ed at the end.
+   * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
+   *         out of the CellScanner will share the same ByteBuffer being passed.
+   * @throws IOException if cell encoding fails
+   */
+  public CellScanner createCellScannerReusingBuffers(final Codec codec,
+      final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
+    // Use this method from HRS to create the CellScanner
+    // If compressed, decompress it first before passing it on else we will leak compression
+    // resources if the stream is not closed properly after we let it out.
+    if (compressor != null) {
+      cellBlock = decompress(compressor, cellBlock);
+    }
+    return codec.getDecoder(cellBlock);
+  }
+
+  private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
+      throws IOException {
+    // GZIPCodec fails w/ NPE if no configuration.
+    if (compressor instanceof Configurable) {
+      ((Configurable) compressor).setConf(this.conf);
+    }
+    Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
+    CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
+      poolDecompressor);
+    ByteBufferOutputStream bbos;
+    try {
+      // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
+      // TODO: Reuse buffers.
+      bbos = new ByteBufferOutputStream(
+          cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+      IOUtils.copy(cis, bbos);
+      bbos.close();
+      cellBlock = bbos.getByteBuffer();
+    } finally {
+      CodecPool.returnDecompressor(poolDecompressor);
+    }
+    return cellBlock;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
new file mode 100644
index 0000000..ffd27b3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown if a cellscanner but no codec to encode it with.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CellScannerButNoCodecException extends HBaseIOException {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 74f934c..c18bd7e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -20,221 +20,25 @@ package org.apache.hadoop.hbase.ipc;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
+
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.BufferOverflowException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * Utility to help ipc'ing.
  */
 @InterfaceAudience.Private
 public class IPCUtil {
-  // LOG is being used in TestIPCUtil
-  public static final Log LOG = LogFactory.getLog(IPCUtil.class);
-  /**
-   * How much we think the decompressor will expand the original compressed content.
-   */
-  private final int cellBlockDecompressionMultiplier;
-  private final int cellBlockBuildingInitialBufferSize;
-  private final Configuration conf;
-
-  public IPCUtil(final Configuration conf) {
-    super();
-    this.conf = conf;
-    this.cellBlockDecompressionMultiplier =
-        conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
-
-    // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
-    // #buildCellBlock.
-    this.cellBlockBuildingInitialBufferSize =
-      ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
-  }
-
-  /**
-   * Thrown if a cellscanner but no codec to encode it with.
-   */
-  public static class CellScannerButNoCodecException extends HBaseIOException {};
-
-  /**
-   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
-   * <code>compressor</code>.
-   * @param codec to use for encoding
-   * @param compressor to use for encoding
-   * @param cellScanner to encode
-   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
-   *   passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
-   *   flipped and is ready for reading.  Use limit to find total size.
-   * @throws IOException if encoding the cells fail
-   */
-  @SuppressWarnings("resource")
-  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
-      final CellScanner cellScanner) throws IOException {
-    if (cellScanner == null) {
-      return null;
-    }
-    if (codec == null) {
-      throw new CellScannerButNoCodecException();
-    }
-    int bufferSize = this.cellBlockBuildingInitialBufferSize;
-    ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
-    encodeCellsTo(baos, cellScanner, codec, compressor);
-    if (LOG.isTraceEnabled()) {
-      if (bufferSize < baos.size()) {
-        LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
-            + "; up hbase.ipc.cellblock.building.initial.buffersize?");
-      }
-    }
-    ByteBuffer bb = baos.getByteBuffer();
-    // If no cells, don't mess around. Just return null (could be a bunch of existence checking
-    // gets or something -- stuff that does not return a cell).
-    if (!bb.hasRemaining()) return null;
-    return bb;
-  }
-
-  private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
-      CompressionCodec compressor) throws IOException {
-    OutputStream os = bbos;
-    Compressor poolCompressor = null;
-    try  {
-      if (compressor != null) {
-        if (compressor instanceof Configurable) {
-          ((Configurable) compressor).setConf(this.conf);
-        }
-        poolCompressor = CodecPool.getCompressor(compressor);
-        os = compressor.createOutputStream(os, poolCompressor);
-      }
-      Codec.Encoder encoder = codec.getEncoder(os);
-      while (cellScanner.advance()) {
-        encoder.write(cellScanner.current());
-      }
-      encoder.flush();
-    } catch (BufferOverflowException e) {
-      throw new DoNotRetryIOException(e);
-    } finally {
-      os.close();
-      if (poolCompressor != null) {
-        CodecPool.returnCompressor(poolCompressor);
-      }
-    }
-  }
-
-  /**
-   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
-   * <code>compressor</code>.
-   * @param codec to use for encoding
-   * @param compressor to use for encoding
-   * @param cellScanner to encode
-   * @param pool Pool of ByteBuffers to make use of.
-   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
-   *   passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
-   *   flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
-   *   null, then this returned ByteBuffer came from there and should be returned to the pool when
-   *   done.
-   * @throws IOException if encoding the cells fail
-   */
-  @SuppressWarnings("resource")
-  public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
-      CellScanner cellScanner, ByteBufferPool pool) throws IOException {
-    if (cellScanner == null) {
-      return null;
-    }
-    if (codec == null) {
-      throw new CellScannerButNoCodecException();
-    }
-    assert pool != null;
-    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
-    encodeCellsTo(bbos, cellScanner, codec, compressor);
-    if (bbos.size() == 0) {
-      bbos.releaseResources();
-      return null;
-    }
-    return bbos;
-  }
-
-  /**
-   * @param codec to use for cellblock
-   * @param cellBlock to encode
-   * @return CellScanner to work against the content of <code>cellBlock</code>
-   * @throws IOException if encoding fails
-   */
-  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
-      final byte[] cellBlock) throws IOException {
-    // Use this method from Client side to create the CellScanner
-    ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
-    if (compressor != null) {
-      cellBlockBuf = decompress(compressor, cellBlockBuf);
-    }
-    // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
-    // make Cells directly over the passed BB. This method is called at client side and we don't
-    // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
-    // of the Cells at user's app level will make it not possible to GC the response byte[]
-    return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
-  }
-
-  /**
-   * @param codec to use for cellblock
-   * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
-   *   position()'ed at the start of the cell block and limit()'ed at the end.
-   * @return CellScanner to work against the content of <code>cellBlock</code>.
-   *   All cells created out of the CellScanner will share the same ByteBuffer being passed.
-   * @throws IOException if cell encoding fails
-   */
-  public CellScanner createCellScannerReusingBuffers(final Codec codec,
-      final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
-    // Use this method from HRS to create the CellScanner
-    // If compressed, decompress it first before passing it on else we will leak compression
-    // resources if the stream is not closed properly after we let it out.
-    if (compressor != null) {
-      cellBlock = decompress(compressor, cellBlock);
-    }
-    return codec.getDecoder(cellBlock);
-  }
-
-  private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
-      throws IOException {
-    // GZIPCodec fails w/ NPE if no configuration.
-    if (compressor instanceof Configurable) {
-      ((Configurable) compressor).setConf(this.conf);
-    }
-    Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
-    CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
-        poolDecompressor);
-    ByteBufferOutputStream bbos;
-    try {
-      // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
-      // TODO: Reuse buffers.
-      bbos = new ByteBufferOutputStream(
-          cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
-      IOUtils.copy(cis, bbos);
-      bbos.close();
-      cellBlock = bbos.getByteBuffer();
-    } finally {
-      CodecPool.returnDecompressor(poolDecompressor);
-    }
-    return cellBlock;
-  }
 
   /**
    * Write out header, param, and cell block if there is one.
@@ -246,10 +50,9 @@ public class IPCUtil {
    * @throws IOException if write action fails
    */
   public static int write(final OutputStream dos, final Message header, final Message param,
-      final ByteBuffer cellBlock)
-  throws IOException {
+      final ByteBuffer cellBlock) throws IOException {
     // Must calculate total size and write that first so other side can read it all in in one
-    // swoop.  This is dictated by how the server is currently written.  Server needs to change
+    // swoop. This is dictated by how the server is currently written. Server needs to change
     // if we are to be able to write without the length prefixing.
     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
     if (cellBlock != null) {
@@ -259,8 +62,7 @@ public class IPCUtil {
   }
 
   private static int write(final OutputStream dos, final Message header, final Message param,
-    final ByteBuffer cellBlock, final int totalSize)
-  throws IOException {
+      final ByteBuffer cellBlock, final int totalSize) throws IOException {
     // I confirmed toBytes does same as DataOutputStream#writeInt.
     dos.write(Bytes.toBytes(totalSize));
     // This allocates a buffer that is the size of the message internally.
@@ -278,9 +80,9 @@ public class IPCUtil {
   /**
    * @return Size on the wire when the two messages are written with writeDelimitedTo
    */
-  public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
+  public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
     int totalSize = 0;
-    for (Message m: messages) {
+    for (Message m : messages) {
       if (m == null) {
         continue;
       }
@@ -290,4 +92,52 @@ public class IPCUtil {
     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
     return totalSize;
   }
+
+  /**
+   * @return True if the exception is a fatal connection exception.
+   */
+  public static boolean isFatalConnectionException(final ExceptionResponse e) {
+    return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
+  }
+
+  /**
+   * @param e exception to be wrapped
+   * @return RemoteException made from passed <code>e</code>
+   */
+  public static RemoteException createRemoteException(final ExceptionResponse e) {
+    String innerExceptionClassName = e.getExceptionClassName();
+    boolean doNotRetry = e.getDoNotRetry();
+    return e.hasHostname() ?
+    // If a hostname then add it to the RemoteWithExtrasException
+        new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
+            e.getPort(), doNotRetry)
+        : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
+  }
+
+  /**
+   * Takes an Exception and the address we were trying to connect to and return an IOException with
+   * the input exception as the cause. The new exception provides the stack trace of the place where
+   * the exception is thrown and some extra diagnostics information. If the exception is
+   * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
+   * an IOException.
+   * @param addr target address
+   * @param exception the relevant exception
+   * @return an exception to throw
+   */
+  public static IOException wrapException(InetSocketAddress addr, Exception exception) {
+    if (exception instanceof ConnectException) {
+      // connection refused; include the host:port in the error
+      return (ConnectException) new ConnectException(
+          "Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
+    } else if (exception instanceof SocketTimeoutException) {
+      return (SocketTimeoutException) new SocketTimeoutException(
+          "Call to " + addr + " failed because " + exception).initCause(exception);
+    } else if (exception instanceof ConnectionClosingException) {
+      return (ConnectionClosingException) new ConnectionClosingException(
+          "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
+    } else {
+      return (IOException) new IOException(
+          "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 37b9afd..03b2953 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -896,7 +896,7 @@ public class RpcClientImpl extends AbstractRpcClient {
       }
       builder.setMethodName(call.md.getName());
       builder.setRequestParam(call.param != null);
-      ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
+      ByteBuffer cellBlock = cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells);
       if (cellBlock != null) {
         CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
         cellBlockBuilder.setLength(cellBlock.limit());
@@ -997,12 +997,12 @@ public class RpcClientImpl extends AbstractRpcClient {
         }
         if (responseHeader.hasException()) {
           ExceptionResponse exceptionResponse = responseHeader.getException();
-          RemoteException re = createRemoteException(exceptionResponse);
+          RemoteException re = IPCUtil.createRemoteException(exceptionResponse);
           call.setException(re);
           call.callStats.setResponseSizeBytes(totalSize);
           call.callStats.setCallTimeMs(
               EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
-          if (isFatalConnectionException(exceptionResponse)) {
+          if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
             markClosed(re);
           }
         } else {
@@ -1017,7 +1017,7 @@ public class RpcClientImpl extends AbstractRpcClient {
             int size = responseHeader.getCellBlockMeta().getLength();
             byte [] cellBlock = new byte[size];
             IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
-            cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
+            cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
           }
           call.setResponse(value, cellBlockScanner);
           call.callStats.setResponseSizeBytes(totalSize);
@@ -1044,29 +1044,6 @@ public class RpcClientImpl extends AbstractRpcClient {
       }
     }
 
-    /**
-     * @return True if the exception is a fatal connection exception.
-     */
-    private boolean isFatalConnectionException(final ExceptionResponse e) {
-      return e.getExceptionClassName().
-        equals(FatalConnectionException.class.getName());
-    }
-
-    /**
-     * @param e exception to be wrapped
-     * @return RemoteException made from passed <code>e</code>
-     */
-    private RemoteException createRemoteException(final ExceptionResponse e) {
-      String innerExceptionClassName = e.getExceptionClassName();
-      boolean doNotRetry = e.getDoNotRetry();
-      return e.hasHostname()?
-        // If a hostname then add it to the RemoteWithExtrasException
-        new RemoteWithExtrasException(innerExceptionClassName,
-          e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
-        new RemoteWithExtrasException(innerExceptionClassName,
-          e.getStackTrace(), doNotRetry);
-    }
-
     protected synchronized boolean markClosed(IOException e) {
       if (e == null){
         throw new NullPointerException();
@@ -1322,7 +1299,7 @@ public class RpcClientImpl extends AbstractRpcClient {
         throw call.error;
       }
       // local exception
-      throw wrapException(addr, call.error);
+      throw IPCUtil.wrapException(addr, call.error);
     }
 
     return call;

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
new file mode 100644
index 0000000..b780b95
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestCellBlockBuilder {
+
+  private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class);
+
+  CellBlockBuilder builder;
+
+  @Before
+  public void before() {
+    this.builder = new CellBlockBuilder(new Configuration());
+  }
+
+  @Test
+  public void testBuildCellBlock() throws IOException {
+    doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), null);
+    doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new DefaultCodec());
+    doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new GzipCodec());
+  }
+
+  static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec,
+      final CompressionCodec compressor) throws IOException {
+    doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
+  }
+
+  static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec,
+      final CompressionCodec compressor, final int count, final int size, final boolean sized)
+      throws IOException {
+    Cell[] cells = getCells(count, size);
+    CellScanner cellScanner = sized ? getSizedCellScanner(cells)
+        : CellUtil.createCellScanner(Arrays.asList(cells).iterator());
+    ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
+    cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb);
+    int i = 0;
+    while (cellScanner.advance()) {
+      i++;
+    }
+    assertEquals(count, i);
+  }
+
+  static CellScanner getSizedCellScanner(final Cell[] cells) {
+    int size = -1;
+    for (Cell cell : cells) {
+      size += CellUtil.estimatedSerializedSizeOf(cell);
+    }
+    final int totalSize = ClassSize.align(size);
+    final CellScanner cellScanner = CellUtil.createCellScanner(cells);
+    return new SizedCellScanner() {
+      @Override
+      public long heapSize() {
+        return totalSize;
+      }
+
+      @Override
+      public Cell current() {
+        return cellScanner.current();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return cellScanner.advance();
+      }
+    };
+  }
+
+  static Cell[] getCells(final int howMany) {
+    return getCells(howMany, 1024);
+  }
+
+  static Cell[] getCells(final int howMany, final int valueSize) {
+    Cell[] cells = new Cell[howMany];
+    byte[] value = new byte[valueSize];
+    for (int i = 0; i < howMany; i++) {
+      byte[] index = Bytes.toBytes(i);
+      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
+      cells[i] = kv;
+    }
+    return cells;
+  }
+
+  private static final String COUNT = "--count=";
+  private static final String SIZE = "--size=";
+
+  /**
+   * Prints usage and then exits w/ passed <code>errCode</code>
+   * @param errCode
+   */
+  private static void usage(final int errCode) {
+    System.out.println("Usage: IPCUtil [options]");
+    System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
+    System.out.println(" --count  Count of Cells");
+    System.out.println(" --size   Size of Cell values");
+    System.out.println("Example: IPCUtil --count=1024 --size=1024");
+    System.exit(errCode);
+  }
+
+  private static void timerTests(final CellBlockBuilder util, final int count, final int size,
+      final Codec codec, final CompressionCodec compressor) throws IOException {
+    final int cycles = 1000;
+    StopWatch timer = new StopWatch();
+    timer.start();
+    for (int i = 0; i < cycles; i++) {
+      timerTest(util, timer, count, size, codec, compressor, false);
+    }
+    timer.stop();
+    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + ", count="
+        + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+    timer.reset();
+    timer.start();
+    for (int i = 0; i < cycles; i++) {
+      timerTest(util, timer, count, size, codec, compressor, true);
+    }
+    timer.stop();
+    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + ", count="
+        + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+  }
+
+  private static void timerTest(final CellBlockBuilder util, final StopWatch timer, final int count,
+      final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
+      throws IOException {
+    doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
+  }
+
+  /**
+   * For running a few tests of methods herein.
+   * @param args
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+    int count = 1024;
+    int size = 10240;
+    for (String arg : args) {
+      if (arg.startsWith(COUNT)) {
+        count = Integer.parseInt(arg.replace(COUNT, ""));
+      } else if (arg.startsWith(SIZE)) {
+        size = Integer.parseInt(arg.replace(SIZE, ""));
+      } else {
+        usage(1);
+      }
+    }
+    CellBlockBuilder util = new CellBlockBuilder(HBaseConfiguration.create());
+    ((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL);
+    timerTests(util, count, size, new KeyValueCodec(), null);
+    timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec());
+    timerTests(util, count, size, new KeyValueCodec(), new GzipCodec());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index c90b275..ef534c0 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -17,181 +17,28 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.log4j.Level;
-import org.junit.Before;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
-@Category({ClientTests.class, SmallTests.class})
 public class TestIPCUtil {
 
-  private static final Log LOG = LogFactory.getLog(TestIPCUtil.class);
-
-  IPCUtil util;
-  @Before
-  public void before() {
-    this.util = new IPCUtil(new Configuration());
-  }
-
   @Test
-  public void testBuildCellBlock() throws IOException {
-    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null);
-    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec());
-    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec());
-  }
-
-  static void doBuildCellBlockUndoCellBlock(final IPCUtil util,
-      final Codec codec, final CompressionCodec compressor)
-  throws IOException {
-    doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
-  }
-
-  static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec,
-    final CompressionCodec compressor, final int count, final int size, final boolean sized)
-  throws IOException {
-    Cell [] cells = getCells(count, size);
-    CellScanner cellScanner = sized? getSizedCellScanner(cells):
-      CellUtil.createCellScanner(Arrays.asList(cells).iterator());
-    ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
-    cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb);
-    int i = 0;
-    while (cellScanner.advance()) {
-      i++;
-    }
-    assertEquals(count, i);
-  }
-
-  static CellScanner getSizedCellScanner(final Cell [] cells) {
-    int size = -1;
-    for (Cell cell: cells) {
-      size += CellUtil.estimatedSerializedSizeOf(cell);
-    }
-    final int totalSize = ClassSize.align(size);
-    final CellScanner cellScanner = CellUtil.createCellScanner(cells);
-    return new SizedCellScanner() {
-      @Override
-      public long heapSize() {
-        return totalSize;
-      }
-
-      @Override
-      public Cell current() {
-        return cellScanner.current();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return cellScanner.advance();
-      }
-    };
-  }
-
-  static Cell [] getCells(final int howMany) {
-    return getCells(howMany, 1024);
-  }
-
-  static Cell [] getCells(final int howMany, final int valueSize) {
-    Cell [] cells = new Cell[howMany];
-    byte [] value = new byte[valueSize];
-    for (int i = 0; i < howMany; i++) {
-      byte [] index = Bytes.toBytes(i);
-      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
-      cells[i] = kv;
-    }
-    return cells;
-  }
-
-  private static final String COUNT = "--count=";
-  private static final String SIZE = "--size=";
-
-  /**
-   * Prints usage and then exits w/ passed <code>errCode</code>
-   * @param errCode
-   */
-  private static void usage(final int errCode) {
-    System.out.println("Usage: IPCUtil [options]");
-    System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
-    System.out.println(" --count  Count of Cells");
-    System.out.println(" --size   Size of Cell values");
-    System.out.println("Example: IPCUtil --count=1024 --size=1024");
-    System.exit(errCode);
-  }
-
-  private static void timerTests(final IPCUtil util, final int count, final int size,
-      final Codec codec, final CompressionCodec compressor)
-  throws IOException {
-    final int cycles = 1000;
-    StopWatch timer = new StopWatch();
-    timer.start();
-    for (int i = 0; i < cycles; i++) {
-      timerTest(util, timer, count, size, codec, compressor, false);
-    }
-    timer.stop();
-    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false +
-        ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
-    timer.reset();
-    timer.start();
-    for (int i = 0; i < cycles; i++) {
-      timerTest(util, timer, count, size, codec, compressor, true);
-    }
-    timer.stop();
-    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true +
-      ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
-  }
-
-  private static void timerTest(final IPCUtil util, final StopWatch timer, final int count,
-      final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
-  throws IOException {
-    doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
-  }
-
-  /**
-   * For running a few tests of methods herein.
-   * @param args
-   * @throws IOException
-   */
-  public static void main(String[] args) throws IOException {
-    int count = 1024;
-    int size = 10240;
-    for (String arg: args) {
-      if (arg.startsWith(COUNT)) {
-        count = Integer.parseInt(arg.replace(COUNT, ""));
-      } else if (arg.startsWith(SIZE)) {
-        size = Integer.parseInt(arg.replace(SIZE, ""));
-      } else {
-        usage(1);
-      }
-    }
-    IPCUtil util = new IPCUtil(HBaseConfiguration.create());
-    ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL);
-    timerTests(util, count, size,  new KeyValueCodec(), null);
-    timerTests(util, count, size,  new KeyValueCodec(), new DefaultCodec());
-    timerTests(util, count, size,  new KeyValueCodec(), new GzipCodec());
+  public void testWrapException() throws Exception {
+    final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
+    assertTrue(wrapException(address, new ConnectException()) instanceof ConnectException);
+    assertTrue(
+      wrapException(address, new SocketTimeoutException()) instanceof SocketTimeoutException);
+    assertTrue(wrapException(address, new ConnectionClosingException(
+        "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
+    assertTrue(
+      wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
+          .getCause() instanceof CallTimeoutException);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 4b27924..759da82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -183,7 +183,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
    */
   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
 
-  private final IPCUtil ipcUtil;
+  private final CellBlockBuilder cellBlockBuilder;
 
   private static final String AUTH_FAILED_FOR = "Auth failed for ";
   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
@@ -434,14 +434,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         List<ByteBuffer> cellBlock = null;
         int cellBlockSize = 0;
         if (reservoir != null) {
-          this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec,
+          this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec,
               this.connection.compressionCodec, cells, reservoir);
           if (this.cellBlockStream != null) {
             cellBlock = this.cellBlockStream.getByteBuffers();
             cellBlockSize = this.cellBlockStream.size();
           }
         } else {
-          ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec,
+          ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec,
               this.connection.compressionCodec, cells);
           if (b != null) {
             cellBlockSize = b.remaining();
@@ -1861,7 +1861,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
         if (header.hasCellBlockMeta()) {
           buf.position(offset);
-          cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
+          cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
         }
       } catch (Throwable t) {
         InetSocketAddress address = getListenerAddress();
@@ -2058,7 +2058,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
 
-    this.ipcUtil = new IPCUtil(conf);
+    this.cellBlockBuilder = new CellBlockBuilder(conf);
 
 
     // Create the responder here

http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 4cfa25c..be5ad56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -35,10 +35,8 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -54,7 +52,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -394,19 +391,4 @@ public abstract class AbstractTestIPC {
       rpcServer.stop();
     }
   }
-
-  @Test
-  public void testWrapException() throws Exception {
-    AbstractRpcClient client =
-        (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
-    final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
-    assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
-    assertTrue(client.wrapException(address,
-      new SocketTimeoutException()) instanceof SocketTimeoutException);
-    assertTrue(client.wrapException(address, new ConnectionClosingException(
-        "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
-    assertTrue(client
-        .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
-        .getCause() instanceof CallTimeoutException);
-  }
 }