You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/11/19 16:39:35 UTC

svn commit: r1543458 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-hadoop1-compat/src/main/ja...

Author: stack
Date: Tue Nov 19 15:39:35 2013
New Revision: 1543458

URL: http://svn.apache.org/r1543458
Log:
HBASE-5945 Reduce buffer copies in IPC server response path

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
    hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
    hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java Tue Nov 19 15:39:35 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -44,7 +43,6 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.io.compress.Decompressor;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
 
@@ -117,10 +115,15 @@ class IPCUtil {
         os = compressor.createOutputStream(os, poolCompressor);
       }
       Codec.Encoder encoder = codec.getEncoder(os);
+      int count = 0;
       while (cellScanner.advance()) {
         encoder.write(cellScanner.current());
+        count++;
       }
       encoder.flush();
+      // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
+      // gets or something -- stuff that does not return a cell).
+      if (count == 0) return null;
     } finally {
       os.close();
       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
@@ -187,24 +190,23 @@ class IPCUtil {
   }
 
   /**
-   * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized
-   * to hold these elements.
-   * @param header
-   * @param param
-   * @param cellBlock
-   * @return A {@link ByteBufferOutputStream} filled with the content of the passed in
-   * <code>header</code>, <code>param</code>, and <code>cellBlock</code>.
+   * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
+   * serialization.
+   * @return The passed in Message serialized with delimiter.  Return null if <code>m</code> is null
    * @throws IOException
    */
-  static ByteBufferOutputStream write(final Message header, final Message param,
-      final ByteBuffer cellBlock)
-  throws IOException {
-    int totalSize = getTotalSizeWhenWrittenDelimited(header, param);
-    if (cellBlock != null) totalSize += cellBlock.limit();
-    ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize);
-    write(bbos, header, param, cellBlock, totalSize);
-    bbos.close();
-    return bbos;
+  static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
+    if (m == null) return null;
+    int serializedSize = m.getSerializedSize();
+    int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
+    byte [] buffer = new byte[serializedSize + vintSize];
+    // Passing in a byte array saves COS creating a buffer which it does when using streams.
+    CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
+    // This will write out the vint preamble and the message serialized.
+    cos.writeMessageNoTag(m);
+    cos.flush();
+    cos.checkNoSpaceLeft();
+    return ByteBuffer.wrap(buffer);
   }
 
   /**
@@ -230,8 +232,9 @@ class IPCUtil {
   private static int write(final OutputStream dos, final Message header, final Message param,
     final ByteBuffer cellBlock, final int totalSize)
   throws IOException {
-    // I confirmed toBytes does same as say DataOutputStream#writeInt.
+    // I confirmed toBytes does same as DataOutputStream#writeInt.
     dos.write(Bytes.toBytes(totalSize));
+    // This allocates a buffer that is the size of the message internally.
     header.writeDelimitedTo(dos);
     if (param != null) param.writeDelimitedTo(dos);
     if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
@@ -240,20 +243,6 @@ class IPCUtil {
   }
 
   /**
-   * @param in Stream cue'd up just before a delimited message
-   * @return Bytes that hold the bytes that make up the message read from <code>in</code>
-   * @throws IOException
-   */
-  static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException {
-    byte b = in.readByte();
-    int size = CodedInputStream.readRawVarint32(b, in);
-    // Allocate right-sized buffer rather than let pb allocate its default minimum 4k.
-    byte [] bytes = new byte[size];
-    IOUtils.readFully(in, bytes);
-    return bytes;
-  }
-
-  /**
    * Read in chunks of 8K (HBASE-7239)
    * @param in
    * @param dest

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Nov 19 15:39:35 2013
@@ -175,10 +175,13 @@ public final class ProtobufUtil {
     ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
 
     builder.setExists(true);
+    builder.setAssociatedCellCount(0);
     EMPTY_RESULT_PB_EXISTS_TRUE =  builder.build();
 
     builder.clear();
+
     builder.setExists(false);
+    builder.setAssociatedCellCount(0);
     EMPTY_RESULT_PB_EXISTS_FALSE =  builder.build();
 
     builder.clear();
@@ -1192,19 +1195,11 @@ public final class ProtobufUtil {
    * @return the converted protocol buffer Result
    */
   public static ClientProtos.Result toResultNoData(final Result result) {
-    if (result.getExists() != null){
-      return result.getExists() ? EMPTY_RESULT_PB_EXISTS_TRUE : EMPTY_RESULT_PB_EXISTS_FALSE;
-    }
-
+    if (result.getExists() != null) return toResult(result.getExists());
     int size = result.size();
-
-    if (size == 0){
-      return EMPTY_RESULT_PB;
-    }
-
+    if (size == 0) return EMPTY_RESULT_PB;
     ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
     builder.setAssociatedCellCount(size);
-
     return builder.build();
   }
 

Modified: hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java (original)
+++ hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java Tue Nov 19 15:39:35 2013
@@ -62,7 +62,7 @@ public interface MetricsHBaseServerSourc
 
   void authenticationFailure();
 
-  void sentBytes(int count);
+  void sentBytes(long count);
 
   void receivedBytes(int count);
 

Modified: hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (original)
+++ hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java Tue Nov 19 15:39:35 2013
@@ -86,7 +86,7 @@ public class MetricsHBaseServerSourceImp
   }
 
   @Override
-  public void sentBytes(int count) {
+  public void sentBytes(long count) {
     this.sentBytes.incr(count);
   }
 

Modified: hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (original)
+++ hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java Tue Nov 19 15:39:35 2013
@@ -89,7 +89,7 @@ public class MetricsHBaseServerSourceImp
   }
 
   @Override
-  public void sentBytes(int count) {
+  public void sentBytes(long count) {
     this.sentBytes.incr(count);
   }
 

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java?rev=1543458&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java Tue Nov 19 15:39:35 2013
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Chain of ByteBuffers.
+ * Used writing out an array of byte buffers.  Writes in chunks.
+ */
+@InterfaceAudience.Private
+class BufferChain {
+  private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0];
+  private final ByteBuffer[] buffers;
+  private int remaining = 0;
+  private int bufferOffset = 0;
+
+  BufferChain(ByteBuffer ... buffers) {
+    // Some of the incoming buffers can be null
+    List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(buffers.length);
+    for (ByteBuffer b : buffers) {
+      if (b == null) continue;
+      bbs.add(b);
+      this.remaining += b.remaining();
+    }
+    this.buffers = bbs.toArray(FOR_TOARRAY_TYPE);
+  }
+
+  /**
+   * Expensive.  Makes a new buffer to hold a copy of what is in contained ByteBuffers.  This
+   * call drains this instance; it cannot be used subsequent to the call.
+   * @return A new byte buffer with the content of all contained ByteBuffers.
+   */
+  byte [] getBytes() {
+    if (!hasRemaining()) throw new IllegalAccessError();
+    byte [] bytes = new byte [this.remaining];
+    int offset = 0;
+    for (ByteBuffer bb: this.buffers) {
+      System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit());
+      offset += bb.capacity();
+    }
+    return bytes;
+  }
+
+  boolean hasRemaining() {
+    return remaining > 0;
+  }
+
+  /**
+   * Write out our chain of buffers in chunks
+   * @param channel Where to write
+   * @param chunkSize Size of chunks to write.
+   * @return Amount written.
+   * @throws IOException
+   */
+  long write(GatheringByteChannel channel, int chunkSize) throws IOException {
+    int chunkRemaining = chunkSize;
+    ByteBuffer lastBuffer = null;
+    int bufCount = 0;
+    int restoreLimit = -1;
+
+    while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
+      lastBuffer = buffers[bufferOffset + bufCount];
+      if (!lastBuffer.hasRemaining()) {
+        bufferOffset++;
+        continue;
+      }
+      bufCount++;
+      if (lastBuffer.remaining() > chunkRemaining) {
+        restoreLimit = lastBuffer.limit();
+        lastBuffer.limit(lastBuffer.position() + chunkRemaining);
+        chunkRemaining = 0;
+        break;
+      } else {
+        chunkRemaining -= lastBuffer.remaining();
+      }
+    }
+    assert lastBuffer != null;
+    if (chunkRemaining == chunkSize) {
+      assert !hasRemaining();
+      // no data left to write
+      return 0;
+    }
+    try {
+      long ret = channel.write(buffers, bufferOffset, bufCount);
+      if (ret > 0) {
+        remaining -= ret;
+      }
+      return ret;
+    } finally {
+      if (restoreLimit >= 0) {
+        lastBuffer.limit(restoreLimit);
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java Tue Nov 19 15:39:35 2013
@@ -47,7 +47,7 @@ public class MetricsHBaseServer {
     source.authenticationSuccess();
   }
 
-  void sentBytes(int count) {
+  void sentBytes(long count) {
     source.sentBytes(count);
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Nov 19 15:39:35 2013
@@ -35,6 +35,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.Channels;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -84,12 +85,11 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
-import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
 import org.apache.hadoop.hbase.security.SaslStatus;
 import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -276,7 +276,10 @@ public class RpcServer implements RpcSer
     protected Connection connection;              // connection to client
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
-    protected ByteBuffer response;                // the response for this call
+    /**
+     * Chain of buffers to send as response.
+     */
+    protected BufferChain response;
     protected boolean delayResponse;
     protected Responder responder;
     protected boolean delayReturnValue;           // if the return value should be
@@ -341,14 +344,14 @@ public class RpcServer implements RpcSer
     }
 
     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
-      this.response = response;
+      this.response = new BufferChain(response);
     }
 
     protected synchronized void setResponse(Object m, final CellScanner cells,
         Throwable t, String errorMsg) {
       if (this.isError) return;
       if (t != null) this.isError = true;
-      ByteBufferOutputStream bbos = null;
+      BufferChain bc = null;
       try {
         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
         // Presume it a pb Message.  Could be null.
@@ -380,42 +383,44 @@ public class RpcServer implements RpcSer
           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
         }
         Message header = headerBuilder.build();
-        bbos = IPCUtil.write(header, result, cellBlock);
+
+        // Organize the response as a set of bytebuffers rather than collect it all together inside
+        // one big byte array; save on allocations.
+        ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
+        ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
+        int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
+          (cellBlock == null? 0: cellBlock.limit());
+        ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
+        bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock);
         if (connection.useWrap) {
-          wrapWithSasl(bbos);
+          bc = wrapWithSasl(bc);
         }
       } catch (IOException e) {
         LOG.warn("Exception while creating response " + e);
       }
-      ByteBuffer bb = null;
-      if (bbos != null) {
-        // TODO: If SASL, maybe buffer already been flipped and written?
-        bb = bbos.getByteBuffer();
-        bb.position(0);
-      }
-      this.response = bb;
+      this.response = bc;
     }
 
-    private void wrapWithSasl(ByteBufferOutputStream response)
-    throws IOException {
-      if (connection.useSasl) {
-        // getByteBuffer calls flip()
-        ByteBuffer buf = response.getByteBuffer();
-        byte[] token;
-        // synchronization may be needed since there can be multiple Handler
-        // threads using saslServer to wrap responses.
-        synchronized (connection.saslServer) {
-          token = connection.saslServer.wrap(buf.array(),
-              buf.arrayOffset(), buf.remaining());
-        }
-        if (LOG.isDebugEnabled())
-          LOG.debug("Adding saslServer wrapped token of size " + token.length
-              + " as call response.");
-        buf.clear();
-        DataOutputStream saslOut = new DataOutputStream(response);
-        saslOut.writeInt(token.length);
-        saslOut.write(token, 0, token.length);
-      }
+    private BufferChain wrapWithSasl(BufferChain bc)
+        throws IOException {
+      if (bc == null) return bc;
+      if (!this.connection.useSasl) return bc;
+      // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
+      // THIS IS A BIG UGLY COPY.
+      byte [] responseBytes = bc.getBytes();
+      byte [] token;
+      // synchronization may be needed since there can be multiple Handler
+      // threads using saslServer to wrap responses.
+      synchronized (connection.saslServer) {
+        token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
+      }
+      if (LOG.isDebugEnabled())
+        LOG.debug("Adding saslServer wrapped token of size " + token.length
+            + " as call response.");
+
+      ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
+      ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
+      return new BufferChain(bbTokenLength, bbTokenBytes);
     }
 
     @Override
@@ -994,7 +999,7 @@ public class RpcServer implements RpcSer
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channelWrite(channel, call.response);
+          long numBytes = channelWrite(channel, call.response);
           if (numBytes < 0) {
             return true;
           }
@@ -1347,6 +1352,7 @@ public class RpcServer implements RpcSer
         }
       }
     }
+
     /**
      * No protobuf encoding of raw sasl messages
      */
@@ -2169,19 +2175,15 @@ public class RpcServer implements RpcSer
    * buffer.
    *
    * @param channel writable byte channel to write to
-   * @param buffer buffer to write
+   * @param bufferChain Chain of buffers to write
    * @return number of bytes written
    * @throws java.io.IOException e
    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
    */
-  protected int channelWrite(WritableByteChannel channel,
-                                    ByteBuffer buffer) throws IOException {
-
-    int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-           channel.write(buffer) : channelIO(null, channel, buffer);
-    if (count > 0) {
-      metrics.sentBytes(count);
-    }
+  protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
+  throws IOException {
+    long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
+    if (count > 0) this.metrics.sentBytes(count);
     return count;
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov 19 15:39:35 2013
@@ -2834,7 +2834,7 @@ public class HRegionServer implements Cl
       if (existence != null){
         ClientProtos.Result pbr = ProtobufUtil.toResult(existence);
         builder.setResult(pbr);
-      }else  if (r != null) {
+      } else  if (r != null) {
         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
         builder.setResult(pbr);
       }
@@ -3294,6 +3294,7 @@ public class HRegionServer implements Cl
   @Override
   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
   throws ServiceException {
+    
     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java Tue Nov 19 15:39:35 2013
@@ -64,7 +64,6 @@ public class TestFromClientSide3 {
   private final static byte[] VAL_BYTES = Bytes.toBytes("v1");
   private final static byte[] ROW_BYTES = Bytes.toBytes("r1");
 
-
   /**
    * @throws java.lang.Exception
    */
@@ -345,6 +344,7 @@ public class TestFromClientSide3 {
     gets.add(new Get(ROW));
     gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
 
+    LOG.info("Calling exists");
     Boolean[] results = table.exists(gets);
     assertEquals(results[0], false);
     assertEquals(results[1], false);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Tue Nov 19 15:39:35 2013
@@ -82,8 +82,7 @@ import org.junit.experimental.categories
 public class TestRegionObserverInterface {
   static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
 
-  public static final TableName TEST_TABLE =
-      TableName.valueOf("TestTable");
+  public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
   public final static byte[] A = Bytes.toBytes("a");
   public final static byte[] B = Bytes.toBytes("b");
   public final static byte[] C = Bytes.toBytes("c");
@@ -111,126 +110,132 @@ public class TestRegionObserverInterface
 
   @Test
   public void testRegionObserver() throws IOException {
-    TableName tableName = TEST_TABLE;
+    TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRegionObserver");
     // recreate table every time in order to reset the status of the
     // coprocessor.
     HTable table = util.createTable(tableName, new byte[][] {A, B, C});
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
-            "hadDelete"},
-        TEST_TABLE,
-        new Boolean[] {false, false, false, false, false});
-
-    Put put = new Put(ROW);
-    put.add(A, A, A);
-    put.add(B, B, B);
-    put.add(C, C, C);
-    table.put(put);
+    try {
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+      "hadDelete"},
+      tableName,
+      new Boolean[] {false, false, false, false, false});
+
+      Put put = new Put(ROW);
+      put.add(A, A, A);
+      put.add(B, B, B);
+      put.add(C, C, C);
+      table.put(put);
 
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
-            "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
-        TEST_TABLE,
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+        "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+        tableName,
         new Boolean[] {false, false, true, true, true, true, false}
-    );
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
-        TEST_TABLE,
-        new Integer[] {1, 1, 0, 0});
-
-    Get get = new Get(ROW);
-    get.addColumn(A, A);
-    get.addColumn(B, B);
-    get.addColumn(C, C);
-    table.get(get);
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
-            "hadDelete"},
-        TEST_TABLE,
-        new Boolean[] {true, true, true, true, false}
-    );
-
-    Delete delete = new Delete(ROW);
-    delete.deleteColumn(A, A);
-    delete.deleteColumn(B, B);
-    delete.deleteColumn(C, C);
-    table.delete(delete);
+          );
 
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
-            "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
-        TEST_TABLE,
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
+          tableName,
+          new Integer[] {1, 1, 0, 0});
+
+      Get get = new Get(ROW);
+      get.addColumn(A, A);
+      get.addColumn(B, B);
+      get.addColumn(C, C);
+      table.get(get);
+
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+      "hadDelete"},
+      tableName,
+      new Boolean[] {true, true, true, true, false}
+          );
+
+      Delete delete = new Delete(ROW);
+      delete.deleteColumn(A, A);
+      delete.deleteColumn(B, B);
+      delete.deleteColumn(C, C);
+      table.delete(delete);
+
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+        "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+        tableName,
         new Boolean[] {true, true, true, true, true, true, true}
-    );
-    util.deleteTable(tableName);
-    table.close();
-
+          );
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
     verifyMethodResult(SimpleRegionObserver.class,
         new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
-        TEST_TABLE,
+        tableName,
         new Integer[] {1, 1, 1, 1});
   }
 
   @Test
   public void testRowMutation() throws IOException {
-    TableName tableName = TEST_TABLE;
+    TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRowMutation");
     HTable table = util.createTable(tableName, new byte[][] {A, B, C});
-    verifyMethodResult(SimpleRegionObserver.class,
+    try {
+      verifyMethodResult(SimpleRegionObserver.class,
         new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
             "hadDeleted"},
-        TEST_TABLE,
+        tableName,
         new Boolean[] {false, false, false, false, false});
-
-    Put put = new Put(ROW);
-    put.add(A, A, A);
-    put.add(B, B, B);
-    put.add(C, C, C);
-
-    Delete delete = new Delete(ROW);
-    delete.deleteColumn(A, A);
-    delete.deleteColumn(B, B);
-    delete.deleteColumn(C, C);
-
-    RowMutations arm = new RowMutations(ROW);
-    arm.add(put);
-    arm.add(delete);
-    table.mutateRow(arm);
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
-            "hadDeleted"},
-        TEST_TABLE,
-        new Boolean[] {false, false, true, true, true}
-    );
-    util.deleteTable(tableName);
-    table.close();
+      Put put = new Put(ROW);
+      put.add(A, A, A);
+      put.add(B, B, B);
+      put.add(C, C, C);
+
+      Delete delete = new Delete(ROW);
+      delete.deleteColumn(A, A);
+      delete.deleteColumn(B, B);
+      delete.deleteColumn(C, C);
+
+      RowMutations arm = new RowMutations(ROW);
+      arm.add(put);
+      arm.add(delete);
+      table.mutateRow(arm);
+
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+      "hadDeleted"},
+      tableName,
+      new Boolean[] {false, false, true, true, true}
+          );
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
   }
 
   @Test
   public void testIncrementHook() throws IOException {
-    TableName tableName = TEST_TABLE;
-
+    TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testIncrementHook");
     HTable table = util.createTable(tableName, new byte[][] {A, B, C});
-    Increment inc = new Increment(Bytes.toBytes(0));
-    inc.addColumn(A, A, 1);
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreIncrement", "hadPostIncrement"},
-        tableName,
-        new Boolean[] {false, false}
-    );
-
-    table.increment(inc);
+    try {
+      Increment inc = new Increment(Bytes.toBytes(0));
+      inc.addColumn(A, A, 1);
 
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreIncrement", "hadPostIncrement"},
-        tableName,
-        new Boolean[] {true, true}
-    );
-    util.deleteTable(tableName);
-    table.close();
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreIncrement", "hadPostIncrement"},
+          tableName,
+          new Boolean[] {false, false}
+          );
+
+      table.increment(inc);
+
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreIncrement", "hadPostIncrement"},
+          tableName,
+          new Boolean[] {true, true}
+          );
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
   }
 
   @Test
@@ -464,84 +469,82 @@ public class TestRegionObserverInterface
   @Test
   public void bulkLoadHFileTest() throws Exception {
     String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
-    TableName tableName = TEST_TABLE;
+    TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".bulkLoadHFileTest");
     Configuration conf = util.getConfiguration();
     HTable table = util.createTable(tableName, new byte[][] {A, B, C});
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
-        tableName,
-        new Boolean[] {false, false}
-    );
-
-    FileSystem fs = util.getTestFileSystem();
-    final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
-    Path familyDir = new Path(dir, Bytes.toString(A));
-
-    createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
-
-    //Bulk load
-    new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
-        tableName,
-        new Boolean[] {true, true}
-    );
-    util.deleteTable(tableName);
-    table.close();
+    try {
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+          tableName,
+          new Boolean[] {false, false}
+          );
+
+      FileSystem fs = util.getTestFileSystem();
+      final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
+      Path familyDir = new Path(dir, Bytes.toString(A));
+
+      createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
+
+      //Bulk load
+      new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
+
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+          tableName,
+          new Boolean[] {true, true}
+          );
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
   }
 
   @Test
   public void testRecovery() throws Exception {
-    LOG.info(TestRegionObserverInterface.class.getName()+".testRecovery");
-    TableName tableName = TEST_TABLE;
-
+    LOG.info(TestRegionObserverInterface.class.getName() +".testRecovery");
+    TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRecovery");
     HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+    try {
+      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
+      ServerName sn2 = rs1.getRegionServer().getServerName();
+      String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
+
+      util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
+      while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
+        Thread.sleep(100);
+      }
 
-    JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
-    ServerName sn2 = rs1.getRegionServer().getServerName();
-    String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
-
-    util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
-    while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
-      Thread.sleep(100);
-    }
-
-    Put put = new Put(ROW);
-    put.add(A, A, A);
-    put.add(B, B, B);
-    put.add(C, C, C);
-    table.put(put);
+      Put put = new Put(ROW);
+      put.add(A, A, A);
+      put.add(B, B, B);
+      put.add(C, C, C);
+      table.put(put);
 
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
-            "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
-        TEST_TABLE,
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+        "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+        tableName,
         new Boolean[] {false, false, true, true, true, true, false}
-    );
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
-        TEST_TABLE,
-        new Integer[] {0, 0, 1, 1});
-
-    cluster.killRegionServer(rs1.getRegionServer().getServerName());
-    Threads.sleep(20000); // just to be sure that the kill has fully started.
-    util.waitUntilAllRegionsAssigned(tableName);
+          );
 
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[]{"getCtPreWALRestore", "getCtPostWALRestore"},
-        TEST_TABLE,
-        new Integer[]{1, 1});
-
-    verifyMethodResult(SimpleRegionObserver.class,
-        new String[]{"getCtPrePut", "getCtPostPut"},
-        TEST_TABLE,
-        new Integer[]{0, 0});
-
-    util.deleteTable(tableName);
-    table.close();
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
+          tableName,
+          new Integer[] {0, 0, 1, 1});
+
+      cluster.killRegionServer(rs1.getRegionServer().getServerName());
+      Threads.sleep(1000); // Let the kill soak in.
+      util.waitUntilAllRegionsAssigned(tableName);
+      LOG.info("All regions assigned");
+
+      verifyMethodResult(SimpleRegionObserver.class,
+          new String[]{"getCtPrePut", "getCtPostPut"},
+          tableName,
+          new Integer[]{0, 0});
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
   }
 
   @Test

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java?rev=1543458&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java Tue Nov 19 15:39:35 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+@Category(SmallTests.class)
+public class TestBufferChain {
+  private File tmpFile;
+
+  private static final byte[][] HELLO_WORLD_CHUNKS = new byte[][] {
+      "hello".getBytes(Charsets.UTF_8),
+      " ".getBytes(Charsets.UTF_8),
+      "world".getBytes(Charsets.UTF_8)
+  };
+
+  @Before
+  public void setup() throws IOException {
+    tmpFile = File.createTempFile("TestBufferChain", "txt");
+  }
+
+  @After
+  public void teardown() {
+    tmpFile.delete();
+  }
+
+  @Test
+  public void testGetBackBytesWePutIn() {
+    ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
+    BufferChain chain = new BufferChain(bufs);
+    assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes()));
+  }
+
+  @Test
+  public void testChainChunkBiggerThanWholeArray() throws IOException {
+    ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
+    BufferChain chain = new BufferChain(bufs);
+    writeAndVerify(chain, "hello world", 8192);
+    assertNoRemaining(bufs);
+  }
+
+  @Test
+  public void testChainChunkBiggerThanSomeArrays() throws IOException {
+    ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
+    BufferChain chain = new BufferChain(bufs);
+    writeAndVerify(chain, "hello world", 3);
+    assertNoRemaining(bufs);
+  }
+
+  @Test
+  public void testLimitOffset() throws IOException {
+    ByteBuffer[] bufs = new ByteBuffer[] {
+        stringBuf("XXXhelloYYY", 3, 5),
+        stringBuf(" ", 0, 1),
+        stringBuf("XXXXworldY", 4, 5) };
+    BufferChain chain = new BufferChain(bufs);
+    writeAndVerify(chain , "hello world", 3);
+    assertNoRemaining(bufs);
+  }
+
+  @Test
+  public void testWithSpy() throws IOException {
+    ByteBuffer[] bufs = new ByteBuffer[] {
+        stringBuf("XXXhelloYYY", 3, 5),
+        stringBuf(" ", 0, 1),
+        stringBuf("XXXXworldY", 4, 5) };
+    BufferChain chain = new BufferChain(bufs);
+    FileOutputStream fos = new FileOutputStream(tmpFile);
+    FileChannel ch = Mockito.spy(fos.getChannel());
+    try {
+      chain.write(ch, 2);
+      assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8));
+      chain.write(ch, 2);
+      assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8));
+      chain.write(ch, 3);
+      assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8));
+      chain.write(ch, 8);
+      assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8));
+    } finally {
+      ch.close();
+      fos.close();
+    }
+  }
+
+  private ByteBuffer stringBuf(String string, int position, int length) {
+    ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8));
+    buf.position(position);
+    buf.limit(position + length);
+    assertTrue(buf.hasRemaining());
+    return buf;
+  }
+
+  private void assertNoRemaining(ByteBuffer[] bufs) {
+    for (ByteBuffer buf : bufs) {
+      assertFalse(buf.hasRemaining());
+    }
+  }
+
+  private ByteBuffer[] wrapArrays(byte[][] arrays) {
+    ByteBuffer[] ret = new ByteBuffer[arrays.length];
+    for (int i = 0; i < arrays.length; i++) {
+      ret[i] = ByteBuffer.wrap(arrays[i]);
+    }
+    return ret;
+  }
+
+  private void writeAndVerify(BufferChain chain, String string, int chunkSize)
+      throws IOException {
+    FileOutputStream fos = new FileOutputStream(tmpFile);
+    FileChannel ch = fos.getChannel();
+    try {
+      long remaining = string.length();
+      while (chain.hasRemaining()) {
+        long n = chain.write(ch, chunkSize);
+        assertTrue(n == chunkSize || n == remaining);
+        remaining -= n;
+      }
+      assertEquals(0, remaining);
+    } finally {
+      fos.close();
+    }
+    assertFalse(chain.hasRemaining());
+    assertEquals(string, Files.toString(tmpFile, Charsets.UTF_8));
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java Tue Nov 19 15:39:35 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellScann
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.SmallTests;
@@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -76,6 +79,7 @@ import org.mockito.stubbing.Answer;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
@@ -89,6 +93,8 @@ public class TestIPC {
   public static final Log LOG = LogFactory.getLog(TestIPC.class);
   static byte [] CELL_BYTES =  Bytes.toBytes("xyz");
   static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
+  static byte [] BIG_CELL_BYTES = new byte [10 * 1024];
+  static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
   private final static Configuration CONF = HBaseConfiguration.create();
   // We are using the test TestRpcServiceProtos generated classes and Service because they are
   // available and basic with methods like 'echo', and ping.  Below we make a blocking service
@@ -303,8 +309,10 @@ public class TestIPC {
     int cellcount = Integer.parseInt(args[1]);
     Configuration conf = HBaseConfiguration.create();
     TestRpcServer rpcServer = new TestRpcServer();
+    MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+    EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
     RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);
-    KeyValue kv = KeyValueUtil.ensureKeyValue(CELL);
+    KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL);
     Put p = new Put(kv.getRow());
     for (int i = 0; i < cellcount; i++) {
       p.add(kv);
@@ -324,15 +332,17 @@ public class TestIPC {
           RegionAction.newBuilder(),
           ClientProtos.Action.newBuilder(),
           MutationProto.newBuilder());
-        CellScanner cellScanner = CellUtil.createCellScanner(cells);
-        if (i % 1000 == 0) {
+        builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME).
+          setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
+        if (i % 100000 == 0) {
           LOG.info("" + i);
           // Uncomment this for a thread dump every so often.
           // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
           //  "Thread dump " + Thread.currentThread().getName());
         }
+        CellScanner cellScanner = CellUtil.createCellScanner(cells);
         Pair<Message, CellScanner> response =
-          client.call(null, builder.build(), cellScanner, null, user, address, 0);
+          client.call(md, builder.build(), cellScanner, param, user, address, 0);
         /*
         int count = 0;
         while (p.getSecond().advance()) {