You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/20 20:39:51 UTC

svn commit: r1459015 [6/8] - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/test/...

Modified: hbase/branches/0.95/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/protobuf/Client.proto?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/branches/0.95/hbase-protocol/src/main/protobuf/Client.proto Wed Mar 20 19:39:50 2013
@@ -51,7 +51,16 @@ message Get {
 }
 
 message Result {
-  repeated KeyValue keyValue = 1;
+  // Result includes the Cells or else it just has a count of Cells
+  // that are carried otherwise.
+  repeated Cell cell = 1;
+  // The below count is set when the associated cells are
+  // not part of this protobuf message; they are passed alongside
+  // and then this Message is just a placeholder with metadata.
+  // The count is needed to know how many to peel off the block of Cells as
+  // ours.  NOTE: This is different from the pb managed cellCount of the
+  // 'cell' field above which is non-null when the cells are pb'd.
+  optional int32 associatedCellCount = 2;
 }
 
 /**
@@ -118,24 +127,34 @@ message Condition {
   required Comparator comparator = 5;
 }
 
+
 /**
- * A specific mutate inside a mutate request.
+ * A specific mutation inside a mutate request.
  * It can be an append, increment, put or delete based
- * on the mutate type.
- */
-message Mutate {
-  required bytes row = 1;
-  required MutateType mutateType = 2;
+ * on the mutation type.  It can be fully filled in or
+ * only metadata present because data is being carried
+ * elsewhere outside of pb.
+ */
+message MutationProto {
+  optional bytes row = 1;
+  optional MutationType mutateType = 2;
   repeated ColumnValue columnValue = 3;
-  repeated NameBytesPair attribute = 4;
-  optional uint64 timestamp = 5;
+  optional uint64 timestamp = 4;
+  repeated NameBytesPair attribute = 5;
   optional bool writeToWAL = 6 [default = true];
 
-  // For some mutate, result may be returned, in which case,
+  // For some mutations, a result may be returned, in which case,
   // time range can be specified for potential performance gain
-  optional TimeRange timeRange = 10;
+  optional TimeRange timeRange = 7;
+  // The below count is set when the associated cells are NOT
+  // part of this protobuf message; they are passed alongside
+  // and then this Message is a placeholder with metadata.  The
+  // count is needed to know how many to peel off the block of Cells as
+  // ours.  NOTE: This is different from the pb managed cellCount of the
+  // 'cell' field above which is non-null when the cells are pb'd.
+  optional int32 associatedCellCount = 8;
 
-  enum MutateType {
+  enum MutationType {
     APPEND = 0;
     INCREMENT = 1;
     PUT = 2;
@@ -172,7 +191,7 @@ message Mutate {
  */
 message MutateRequest {
   required RegionSpecifier region = 1;
-  required Mutate mutate = 2;
+  required MutationProto mutation = 2;
   optional Condition condition = 3;
 }
 
@@ -281,7 +300,7 @@ message CoprocessorServiceResponse {
  * This is a union type - exactly one of the fields will be set.
  */
 message MultiAction {
-  optional Mutate mutate = 1;
+  optional MutationProto mutation = 1;
   optional Get get = 2;
 }
 

Modified: hbase/branches/0.95/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/protobuf/MultiRowMutation.proto?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/protobuf/MultiRowMutation.proto (original)
+++ hbase/branches/0.95/hbase-protocol/src/main/protobuf/MultiRowMutation.proto Wed Mar 20 19:39:50 2013
@@ -23,7 +23,7 @@ option java_generic_services = true;
 option optimize_for = SPEED;
 
 message MultiMutateRequest {
-  repeated Mutate mutationRequest = 1;
+  repeated MutationProto mutationRequest = 1;
 }
 
 message MultiMutateResponse {

Modified: hbase/branches/0.95/hbase-protocol/src/main/protobuf/RPC.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/protobuf/RPC.proto?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/protobuf/RPC.proto (original)
+++ hbase/branches/0.95/hbase-protocol/src/main/protobuf/RPC.proto Wed Mar 20 19:39:50 2013
@@ -15,123 +15,117 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-/**
- * Specification of (unsecure) HBase RPC:
- *
- * Client needs to set up a connection first to a server serving a certain
- * HBase protocol (like ClientProtocol). Once the connection is set up, the 
- * client and server communicates on that channel for RPC requests/responses.
- * The sections below describe the flow. 
- * 
- * As part of setting up a connection to a server, the client needs to send
- * the ConnectionHeader header. At the data level, this looks like
- * <"hrpc"-bytearray><'5'[byte]><length-of-serialized-ConnectionHeader-obj[int32]><ConnectionHeader-object serialized>
- *
- * For every RPC that the client makes it needs to send the following
- * RpcRequestHeader and the RpcRequestBody. At the data level this looks like:
- *  <length-of-serialized-RpcRequestHeader + length-of-varint32-of-serialized-RpcRequestHeader + 
- *   length-of-serialized-RpcRequestBody + length-of-varint32-of-serialized-RpcRequestBody>
- *  <RpcRequestHeader [serialized using Message.writeDelimitedTo]>
- *  <RpcRequestBody [serialized using Message.writeDelimitedTo]>
- *
- * On a success, the server's protobuf response looks like
- *  <RpcResponseHeader-object [serialized using Message.writeDelimitedTo]>
- *  <RpcResponseBody-object [serialized using Message.writeDelimitedTo]>
- * On a failure, the server's protobuf response looks like
- *  <RpcResponseHeader-object [serialized using Message.writeDelimitedTo]>
- *  <RpcException-object [serialized using Message.writeDelimitedTo]>
- *
- * There is one special message that's sent from client to server -
- * the Ping message. At the data level, this is just the bytes corresponding
- *   to integer -1.
- */
-
 import "Tracing.proto"; 
+import "hbase.proto"; 
 
 option java_package = "org.apache.hadoop.hbase.protobuf.generated";
 option java_outer_classname = "RPCProtos";
 option java_generate_equals_and_hash = true;
 option optimize_for = SPEED;
 
+// See https://issues.apache.org/jira/browse/HBASE-7898 for high-level
+// description of RPC specification.
+//
+// On connection setup, the client sends six bytes of preamble -- a four
+// byte magic, a byte of version, and a byte of authentication type.
+//
+// We then send a "ConnectionHeader" protobuf of user information and the
+// 'protocol' or 'service' that is to be run over this connection as well as
+// info such as codecs and compression to use when we send cell blocks(see below).
+// This connection header protobuf is prefaced by an int that holds the length
+// of this connection header (this is NOT a varint).  The pb connection header
+// is sent with Message#writeTo.  The server throws an exception if it doesn't
+// like what it was sent noting what it is objecting too.  Otherwise, the server
+// says nothing and is open for business.
+//
+// Hereafter the client makes requests and the server returns responses.
+//
+// Requests look like this:
+//
+// <An int with the total length of the request>
+// <RequestHeader Message written out using Message#writeDelimitedTo>
+// <Optionally a Request Parameter Message written out using Message#writeDelimitedTo>
+// <Optionally a Cell block>
+//
+// ...where the Request Parameter Message is whatever the method name stipulated
+// in the RequestHeader expects; e.g. if the method is a scan, then the pb
+// Request Message is a GetRequest, or a ScanRequest.  A block of Cells
+// optionally follows.  The presence of a Request param Message and/or a
+// block of Cells will be noted in the RequestHeader.
+//
+// Response is the mirror of the request:
+//
+// <An int with the total length of the response>
+// <ResponseHeader Message written out using Message#writeDelimitedTo>
+// <Optionally a Response Result Message written out using Message#writeDelimitedTo>
+// <Optionally a Cell block>
+//
+// ...where the Response Message is the response type that goes with the
+// method specified when making the request and the follow on Cell blocks may
+// or may not be there -- read the response header to find out if one following.
+// If an exception, it will be included inside the Response Header.
+//
+// Any time we write a pb, we do it with Message#writeDelimitedTo EXCEPT when
+// the connection header is sent; this is prefaced by an int with its length
+// and the pb connection header is then written with Message#writeTo.
+//
+
+// User Information proto.  Included in ConnectionHeader on connection setup
 message UserInformation {
   required string effectiveUser = 1;
   optional string realUser = 2;
 }
 
+// This is sent on connection setup after the connection preamble is sent.
 message ConnectionHeader {
-  /** User Info beyond what is established at connection establishment
-   *  (applies to secure HBase setup)
-   */
   optional UserInformation userInfo = 1;
-  /** Protocol name for next rpc layer
-   * the client created a proxy with this protocol name
-   */
   optional string protocol = 2 [default = "org.apache.hadoop.hbase.client.ClientProtocol"];
-}
-
-
-/**
- * The RPC request header
- */
-message RpcRequestHeader {
-  /** Monotonically increasing callId, mostly to keep track of RPCs */
-  required uint32 callId = 1;
-  optional RPCTInfo tinfo = 2;
-}
-/**
- * The RPC request body
- */
-message RpcRequestBody {
-  /** Name of the RPC method */
-  required string methodName = 1;
-
-  /** Bytes corresponding to the client protobuf request. This is the actual
-   *  bytes corresponding to the RPC request argument.
-   */
-  optional bytes request = 2;
-
-  /** Some metainfo about the request. Helps us to treat RPCs with
-   *  different priorities. For now this is just the classname of the request
-   *  proto object. 
-   */
-  optional string requestClassName = 4;
-}
-
-/**
- * The RPC response header
- */
-message RpcResponseHeader {
-  /** Echo back the callId the client sent */
-  required uint32 callId = 1;
-  /** Did the RPC execution encounter an error at the server */
-  enum Status {
-    SUCCESS = 0;
-    ERROR = 1;
-    FATAL = 2;
-  }
-  required Status status = 2;
-}
-/**
- * The RPC response body
- */
-message RpcResponseBody {
-   /** Optional response bytes. This is the actual bytes corresponding to the
-    *  return value of the invoked RPC.
-    */
-  optional bytes response = 1;
-}
-/**
- * At the RPC layer, this message is used to indicate
- * the server side exception to the RPC client.
- *
- * HBase RPC client throws an exception indicated
- * by exceptionName with the stackTrace.
- */
-message RpcException {
-  /** Class name of the exception thrown from the server */
-  required string exceptionName = 1;
-
-  /** Exception stack trace from the server side */
+  // Cell block codec we will use sending over optional cell blocks.  Server throws exception
+  // if cannot deal.
+  optional string cellBlockCodecClass = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];
+  // Compressor we will use if cell block is compressed.  Server will throw exception if not supported.
+  // Class must implement hadoop's CompressionCodec Interface
+  optional string cellBlockCompressorClass = 4;
+}
+
+// Optional Cell block Message.  Included in client RequestHeader
+message CellBlockMeta {
+  // Length of the following cell block.  Could calculate it but convenient having it too hand.
+  optional uint32 length = 1;
+}
+
+// At the RPC layer, this message is used to carry
+// the server side exception to the RPC client.
+message ExceptionResponse {
+  // Class name of the exception thrown from the server
+  optional string exceptionClassName = 1;
+  // Exception stack trace from the server side
   optional string stackTrace = 2;
+  // Optional hostname.  Filled in for some exceptions such as region moved
+  // where exception gives clue on where the region may have moved.
+  optional string hostname = 3;
+  optional int32 port = 4;
+  // Set if we are NOT to retry on receipt of this exception
+  optional bool doNotRetry = 5;
+}
+
+// Header sent making a request.
+message RequestHeader {
+  // Monotonically increasing callId to keep track of RPC requests and their response
+  optional uint32 callId = 1;
+  optional RPCTInfo traceInfo = 2;
+  optional string methodName = 3;
+  // If true, then a pb Message param follows.
+  optional bool requestParam = 4;
+  // If present, then an encoded data block follows.
+  optional CellBlockMeta cellBlockMeta = 5; 
+  // TODO: Have client specify priority
+}
+
+message ResponseHeader {
+  optional uint32 callId = 1;
+  // If present, then request threw an exception and no response message (else we presume one)
+  optional ExceptionResponse exception = 2;
+  // If present, then an encoded data block follows.
+  optional CellBlockMeta cellBlockMeta = 3; 
 }

Modified: hbase/branches/0.95/hbase-protocol/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/protobuf/hbase.proto?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/protobuf/hbase.proto (original)
+++ hbase/branches/0.95/hbase-protocol/src/main/protobuf/hbase.proto Wed Mar 20 19:39:50 2013
@@ -24,6 +24,33 @@ option java_generate_equals_and_hash = t
 option optimize_for = SPEED;
 
 /**
+ * The type of the key in a Cell
+ */
+enum CellType {
+    MINIMUM = 0;
+    PUT = 4;
+
+    DELETE = 8;
+    DELETE_COLUMN = 12;
+    DELETE_FAMILY = 14;
+
+    // MAXIMUM is used when searching; you look from maximum on down.
+    MAXIMUM = 255;
+}
+
+/**
+ * Protocol buffer version of Cell.
+ */
+message Cell {
+  optional bytes row = 1;
+  optional bytes family = 2;
+  optional bytes qualifier = 3;
+  optional uint64 timestamp = 4;
+  optional CellType cellType = 5;
+  optional bytes value = 6;
+}
+
+/**
  * Table Schema
  * Inspired by the rest TableSchema
  */
@@ -201,21 +228,6 @@ enum CompareType {
 }
 
 /**
- * The type of the key in a KeyValue.
- */
-enum KeyType {
-    MINIMUM = 0;
-    PUT = 4;
-
-    DELETE = 8;
-    DELETE_COLUMN = 12;
-    DELETE_FAMILY = 14;
-
-    // MAXIMUM is used when searching; you look from maximum on down.
-    MAXIMUM = 255;
-}
-
-/**
  * Protocol buffer version of KeyValue.
  * It doesn't have those transient parameters
  */
@@ -224,7 +236,7 @@ message KeyValue {
   required bytes family = 2;
   required bytes qualifier = 3;
   optional uint64 timestamp = 4;
-  optional KeyType keyType = 5;
+  optional CellType keyType = 5;
   optional bytes value = 6;
 }
 
@@ -288,4 +300,4 @@ message LongMsg {
 
 message BigDecimalMsg {
   required bytes bigdecimalMsg = 1;
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java Wed Mar 20 19:39:50 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -301,7 +301,7 @@ public class MetaEditor {
     CoprocessorRpcChannel channel = table.coprocessorService(row);
     MultiMutateRequest.Builder mmrBuilder = MultiMutateRequest.newBuilder();
     for (Put put : puts) {
-      mmrBuilder.addMutationRequest(ProtobufUtil.toMutate(MutateType.PUT, put));
+      mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put));
     }
 
     MultiRowMutationService.BlockingInterface service =

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.codec.BaseDecoder;
+import org.apache.hadoop.hbase.codec.BaseEncoder;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.CodecException;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Codec that just writes out Cell as a protobuf Cell Message.  Does not write the mvcc stamp.
+ * Use a different codec if you want that in the stream.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MessageCodec implements Codec {
+  static class MessageEncoder extends BaseEncoder {
+    MessageEncoder(final OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void write(Cell cell) throws IOException {
+      checkFlushed();
+      HBaseProtos.Cell.Builder builder = HBaseProtos.Cell.newBuilder();
+      // This copies bytes from Cell to ByteString.  I don't see anyway around the copy.
+      // ByteString is final.
+      builder.setRow(ByteString.copyFrom(cell.getRowArray(), cell.getRowOffset(),
+          cell.getRowLength()));
+      builder.setFamily(ByteString.copyFrom(cell.getFamilyArray(), cell.getFamilyOffset(),
+          cell.getFamilyLength()));
+      builder.setQualifier(ByteString.copyFrom(cell.getQualifierArray(), cell.getQualifierOffset(),
+          cell.getQualifierLength()));
+      builder.setTimestamp(cell.getTimestamp());
+      builder.setCellType(HBaseProtos.CellType.valueOf(cell.getTypeByte()));
+      builder.setValue(ByteString.copyFrom(cell.getValueArray(), cell.getValueOffset(),
+          cell.getValueLength()));
+      HBaseProtos.Cell pbcell = builder.build();
+      try {
+        pbcell.writeDelimitedTo(this.out);
+      } catch (IOException e) {
+        throw new CodecException(e);
+      }
+    }
+  }
+
+  static class MessageDecoder extends BaseDecoder {
+    MessageDecoder(final InputStream in) {
+      super(in);
+    }
+
+    protected Cell parseCell() throws IOException {
+      HBaseProtos.Cell pbcell = HBaseProtos.Cell.parseDelimitedFrom(this.in);
+      return CellUtil.createCell(pbcell.getRow().toByteArray(),
+        pbcell.getFamily().toByteArray(), pbcell.getQualifier().toByteArray(),
+        pbcell.getTimestamp(), (byte)pbcell.getCellType().getNumber(),
+        pbcell.getValue().toByteArray());
+    }
+  }
+
+  @Override
+  public Decoder getDecoder(InputStream is) {
+    return new MessageDecoder(is);
+  }
+
+  @Override
+  public Encoder getEncoder(OutputStream os) {
+    return new MessageEncoder(os);
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java Wed Mar 20 19:39:50 2013
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
@@ -86,9 +86,9 @@ CoprocessorService, Coprocessor {
     try {
       // set of rows to lock, sorted to avoid deadlocks
       SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-      List<Mutate> mutateRequestList = request.getMutationRequestList();
+      List<MutationProto> mutateRequestList = request.getMutationRequestList();
       List<Mutation> mutations = new ArrayList<Mutation>(mutateRequestList.size());
-      for (Mutate m : mutateRequestList) {
+      for (MutationProto m : mutateRequestList) {
         mutations.add(ProtobufUtil.toMutation(m));
       }
 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed Mar 20 19:39:50 2013
@@ -23,9 +23,9 @@ import static org.apache.hadoop.fs.Commo
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -66,19 +66,22 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.AuthMethod;
@@ -87,11 +90,13 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.security.SaslStatus;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -112,7 +117,10 @@ import org.cloudera.htrace.impl.NullSpan
 
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.TextFormat;
 // Uses Writables doing sasl
 
 /** A client for an IPC service.  IPC calls take a single Protobuf message as a
@@ -126,9 +134,12 @@ import com.google.protobuf.Message;
  */
 @InterfaceAudience.Private
 public abstract class HBaseServer implements RpcServer {
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
   private final boolean authorize;
   protected boolean isSecurityEnabled;
 
+  public static final byte CURRENT_VERSION = 0;
+
   /**
    * How many calls/handler are allowed in the queue.
    */
@@ -150,11 +161,7 @@ public abstract class HBaseServer implem
   private final int warnDelayedCalls;
 
   private AtomicInteger delayedCalls;
-
-  public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
-  protected static final Log TRACELOG =
-      LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer.trace");
+  private final IPCUtil ipcUtil;
 
   private static final String AUTH_FAILED_FOR = "Auth failed for ";
   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
@@ -166,6 +173,7 @@ public abstract class HBaseServer implem
   protected static final ThreadLocal<RpcServer> SERVER =
     new ThreadLocal<RpcServer>();
   private volatile boolean started = false;
+  private static final ReflectionCache methodCache = new ReflectionCache();
 
   private static final Map<String, Class<? extends IpcProtocol>> PROTOCOL_CACHE =
       new ConcurrentHashMap<String, Class<? extends IpcProtocol>>();
@@ -307,7 +315,10 @@ public abstract class HBaseServer implem
   /** A call queued for handling. */
   protected class Call implements RpcCallContext {
     protected int id;                             // the client's call id
-    protected RpcRequestBody rpcRequestBody;                     // the parameter passed
+    protected Method method;
+    protected Message param;                      // the parameter passed
+    // Optional cell data passed outside of protobufs.
+    protected CellScanner cellScanner;
     protected Connection connection;              // connection to client
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
@@ -320,10 +331,12 @@ public abstract class HBaseServer implem
     protected boolean isError;
     protected TraceInfo tinfo;
 
-    public Call(int id, RpcRequestBody rpcRequestBody, Connection connection,
-        Responder responder, long size, TraceInfo tinfo) {
+    public Call(int id, Method method, Message param, CellScanner cellScanner,
+        Connection connection, Responder responder, long size, TraceInfo tinfo) {
       this.id = id;
-      this.rpcRequestBody = rpcRequestBody;
+      this.method = method;
+      this.param = param;
+      this.cellScanner = cellScanner;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
       this.response = null;
@@ -336,57 +349,74 @@ public abstract class HBaseServer implem
 
     @Override
     public String toString() {
-      return rpcRequestBody.toString() + " from " + connection.toString();
+      return "callId: " + this.id + " methodName: " +
+        ((this.method != null)? this.method.getName(): null) + " param: " +
+        (this.param != null? TextFormat.shortDebugString(this.param): "") +
+        " from " + connection.toString();
     }
 
     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
       this.response = response;
     }
 
-    protected synchronized void setResponse(Object value, Status status,
-        String errorClass, String error) {
-      if (this.isError)
-        return;
-      if (errorClass != null) {
-        this.isError = true;
-      }
-
-      ByteBufferOutputStream buf = null;
-      if (value != null) {
-        buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
-      } else {
-        buf = new ByteBufferOutputStream(BUFFER_INITIAL_SIZE);
-      }
-      DataOutputStream out = new DataOutputStream(buf);
+    protected synchronized void setResponse(Object m, final CellScanner cells,
+        Throwable t, String errorMsg) {
+      if (this.isError) return;
+      if (t != null) this.isError = true;
+      ByteBufferOutputStream bbos = null;
       try {
-        RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
+        ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
+        // Presume it a pb Message.  Could be null.
+        Message result = (Message)m;
         // Call id.
-        builder.setCallId(this.id);
-        builder.setStatus(status);
-        builder.build().writeDelimitedTo(out);
-        if (error != null) {
-          RpcException.Builder b = RpcException.newBuilder();
-          b.setExceptionName(errorClass);
-          b.setStackTrace(error);
-          b.build().writeDelimitedTo(out);
-        } else {
-          if (value != null) {
-            ((Message)value).writeDelimitedTo(out);
-          }
+        headerBuilder.setCallId(this.id);
+        if (t != null) {
+          ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+          exceptionBuilder.setExceptionClassName(t.getClass().getName());
+          exceptionBuilder.setStackTrace(errorMsg);
+          exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+          if (t instanceof RegionMovedException) {
+            // Special casing for this exception.  This is only one carrying a payload.
+            // Do this instead of build a generic system for allowing exceptions carry
+            // any kind of payload.
+            RegionMovedException rme = (RegionMovedException)t;
+            exceptionBuilder.setHostname(rme.getHostname());
+            exceptionBuilder.setPort(rme.getPort());
+          }
+          // Set the exception as the result of the method invocation.
+          headerBuilder.setException(exceptionBuilder.build());
+        }
+        ByteBuffer cellBlock =
+          ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells);
+        if (cellBlock != null) {
+          CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+          // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
+          cellBlockBuilder.setLength(cellBlock.limit());
+          headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
         }
+        Message header = headerBuilder.build();
+        bbos = IPCUtil.write(header, result, cellBlock);
         if (connection.useWrap) {
-          wrapWithSasl(buf);
+          wrapWithSasl(bbos);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Header " + TextFormat.shortDebugString(header) +
+            ", result " + (result != null? TextFormat.shortDebugString(result): "null"));
         }
       } catch (IOException e) {
         LOG.warn("Exception while creating response " + e);
       }
-      ByteBuffer bb = buf.getByteBuffer();
-      bb.position(0);
+      ByteBuffer bb = null;
+      if (bbos != null) {
+        // TODO: If SASL, maybe buffer already been flipped and written?
+        bb = bbos.getByteBuffer();
+        bb.position(0);
+      }
       this.response = bb;
     }
 
     private void wrapWithSasl(ByteBufferOutputStream response)
-        throws IOException {
+    throws IOException {
       if (connection.useSasl) {
         // getByteBuffer calls flip()
         ByteBuffer buf = response.getByteBuffer();
@@ -413,8 +443,9 @@ public abstract class HBaseServer implem
       assert this.delayReturnValue || result == null;
       this.delayResponse = false;
       delayedCalls.decrementAndGet();
-      if (this.delayReturnValue)
-        this.setResponse(result, Status.SUCCESS, null, null);
+      if (this.delayReturnValue) {
+        this.setResponse(result, null, null, null);
+      }
       this.responder.doRespond(this);
     }
 
@@ -437,8 +468,7 @@ public abstract class HBaseServer implem
 
     @Override
     public synchronized void endDelayThrowing(Throwable t) throws IOException {
-      this.setResponse(null, Status.ERROR, t.getClass().toString(),
-          StringUtils.stringifyException(t));
+      this.setResponse(null, null, t, StringUtils.stringifyException(t));
       this.delayResponse = false;
       this.sendResponseIfReady();
     }
@@ -517,7 +547,7 @@ public abstract class HBaseServer implem
         readers[i] = reader;
         readPool.execute(reader);
       }
-      LOG.info("Started " + readThreads + " reader(s) in Listener.");
+      LOG.info(getName() + ": started " + readThreads + " reader(s) in Listener.");
 
       // Register accepts on the server socket with the selector.
       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
@@ -540,7 +570,7 @@ public abstract class HBaseServer implem
           try {
             readSelector.close();
           } catch (IOException ioe) {
-            LOG.error("Error closing read selector in " + getName(), ioe);
+            LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
           }
         }
       }
@@ -567,11 +597,11 @@ public abstract class HBaseServer implem
             }
           } catch (InterruptedException e) {
             if (running) {                      // unexpected -- log it
-              LOG.info(getName() + " unexpectedly interrupted: " +
-                  StringUtils.stringifyException(e));
+              LOG.info(getName() + ": unexpectedly interrupted: " +
+                StringUtils.stringifyException(e));
             }
           } catch (IOException ex) {
-            LOG.error("Error in Reader", ex);
+            LOG.error(getName() + ": error in Reader", ex);
           }
         }
       }
@@ -674,7 +704,7 @@ public abstract class HBaseServer implem
         } catch (OutOfMemoryError e) {
           if (errorHandler != null) {
             if (errorHandler.checkOOME(e)) {
-              LOG.info(getName() + ": exiting on OOME");
+              LOG.info(getName() + ": exiting on OutOfMemoryError");
               closeCurrentConnection(key, e);
               cleanupConnections(true);
               return;
@@ -683,7 +713,7 @@ public abstract class HBaseServer implem
             // we can run out of memory if we have too many threads
             // log the event and sleep for a minute and give
             // some thread(s) a chance to finish
-            LOG.warn("Out of Memory in server select", e);
+            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
             closeCurrentConnection(key, e);
             cleanupConnections(true);
             try { Thread.sleep(60000); } catch (Exception ignored) {}
@@ -693,7 +723,7 @@ public abstract class HBaseServer implem
         }
         cleanupConnections(false);
       }
-      LOG.info("Stopping " + this.getName());
+      LOG.info(getName() + ": stopping");
 
       synchronized (this) {
         try {
@@ -750,7 +780,7 @@ public abstract class HBaseServer implem
             numConnections++;
           }
           if (LOG.isDebugEnabled())
-            LOG.debug("Server connection from " + c.toString() +
+            LOG.debug(getName() + ": connection from " + c.toString() +
                 "; # active connections: " + numConnections +
                 "; # queued calls: " + callQueue.size());
         } finally {
@@ -766,24 +796,23 @@ public abstract class HBaseServer implem
         return;
       }
       c.setLastContact(System.currentTimeMillis());
-
       try {
         count = c.readAndProcess();
       } catch (InterruptedException ieo) {
         throw ieo;
       } catch (Exception e) {
-        LOG.warn(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+        LOG.warn(getName() + ": count of bytes read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
-        if (LOG.isDebugEnabled())
-          LOG.debug(getName() + ": disconnecting client " +
-                    c.getHostAddress() + ". Number of active connections: "+
-                    numConnections);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
+            ", because count=" + count +
+            ". Number of active connections: " + numConnections);
+        }
         closeConnection(c);
         // c = null;
-      }
-      else {
+      } else {
         c.setLastContact(System.currentTimeMillis());
       }
     }
@@ -797,7 +826,7 @@ public abstract class HBaseServer implem
         try {
           acceptChannel.socket().close();
         } catch (IOException e) {
-          LOG.info(getName() + ":Exception in closing listener socket. " + e);
+          LOG.info(getName() + ": exception in closing listener socket. " + e);
         }
       }
       readPool.shutdownNow();
@@ -830,11 +859,11 @@ public abstract class HBaseServer implem
       try {
         doRunLoop();
       } finally {
-        LOG.info("Stopping " + this.getName());
+        LOG.info(getName() + ": stopping");
         try {
           writeSelector.close();
         } catch (IOException ioe) {
-          LOG.error("Couldn't close write selector in " + this.getName(), ioe);
+          LOG.error(getName() + ": couldn't close write selector", ioe);
         }
       }
     }
@@ -855,7 +884,7 @@ public abstract class HBaseServer implem
                   doAsyncWrite(key);
               }
             } catch (IOException e) {
-              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+              LOG.info(getName() + ": asyncWrite", e);
             }
           }
           long now = System.currentTimeMillis();
@@ -867,7 +896,7 @@ public abstract class HBaseServer implem
           // If there were some calls that have not been sent out for a
           // long time, discard them.
           //
-          LOG.debug("Checking for old call responses.");
+          if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
           ArrayList<Call> calls;
 
           // get the list of channels from list of keys.
@@ -887,13 +916,13 @@ public abstract class HBaseServer implem
             try {
               doPurge(call, now);
             } catch (IOException e) {
-              LOG.warn("Error in purging old calls " + e);
+              LOG.warn(getName() + ": error in purging old calls " + e);
             }
           }
         } catch (OutOfMemoryError e) {
           if (errorHandler != null) {
             if (errorHandler.checkOOME(e)) {
-              LOG.info(getName() + ": exiting on OOME");
+              LOG.info(getName() + ": exiting on OutOfMemoryError");
               return;
             }
           } else {
@@ -902,15 +931,15 @@ public abstract class HBaseServer implem
             // log the event and sleep for a minute and give
             // some thread(s) a chance to finish
             //
-            LOG.warn("Out of Memory in server select", e);
+            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
             try { Thread.sleep(60000); } catch (Exception ignored) {}
           }
         } catch (Exception e) {
-          LOG.warn("Exception in Responder " +
+          LOG.warn(getName() + ": exception in Responder " +
                    StringUtils.stringifyException(e));
         }
       }
-      LOG.info("Stopping " + this.getName());
+      LOG.info(getName() + ": stopped");
     }
 
     private void doAsyncWrite(SelectionKey key) throws IOException {
@@ -958,8 +987,8 @@ public abstract class HBaseServer implem
     // Processes one response. Returns true if there are no more pending
     // data for this channel.
     //
-    private boolean processResponse(final LinkedList<Call> responseQueue,
-                                    boolean inHandler) throws IOException {
+    private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
+    throws IOException {
       boolean error = true;
       boolean done = false;       // there is more data for this channel.
       int numElements;
@@ -980,10 +1009,6 @@ public abstract class HBaseServer implem
           //
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.id + " from " +
-                      call.connection);
-          }
           //
           // Send as much data as we can in the non-blocking fashion
           //
@@ -1000,8 +1025,8 @@ public abstract class HBaseServer implem
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
-                        call.connection + " Wrote " + numBytes + " bytes.");
+              LOG.debug(getName() + ": callId: " + call.id + " sent, wrote " + numBytes +
+                " bytes.");
             }
           } else {
             //
@@ -1017,16 +1042,15 @@ public abstract class HBaseServer implem
                 done = true;
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
-                        call.connection + " Wrote partial " + numBytes +
-                        " bytes.");
+              LOG.debug(getName() + call.toString() + " partially sent, wrote " +
+                numBytes + " bytes.");
             }
           }
           error = false;              // everything went off well
         }
       } finally {
         if (error && call != null) {
-          LOG.warn(getName()+", call " + call + ": output error");
+          LOG.warn(getName() + call.toString() + ": output error");
           done = true;               // error. no more data for this channel.
           closeConnection(call.connection);
         }
@@ -1090,56 +1114,90 @@ public abstract class HBaseServer implem
     }
   }
 
+  @SuppressWarnings("serial")
+  public static class CallQueueTooBigException extends IOException {
+    CallQueueTooBigException() {
+      super();
+    }
+  }
+
+  private Function<Pair<RequestHeader, Message>, Integer> qosFunction = null;
+
+  /**
+   * Gets the QOS level for this call.  If it is higher than the highPriorityLevel and there
+   * are priorityHandlers available it will be processed in it's own thread set.
+   *
+   * @param newFunc
+   */
+  @Override
+  public void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc) {
+    qosFunction = newFunc;
+  }
+
+  protected int getQosLevel(Pair<RequestHeader, Message> headerAndParam) {
+    if (qosFunction == null) return 0;
+    Integer res = qosFunction.apply(headerAndParam);
+    return res == null? 0: res;
+  }
+
   /** Reads calls from a connection and queues them for handling. */
   public class Connection {
-    private boolean rpcHeaderRead = false; //if initial signature and
-                                         //version are read
-    private boolean headerRead = false;  //if the connection header that
-                                         //follows version is read.
+    // If initial preamble with version and magic has been read or not.
+    private boolean connectionPreambleRead = false;
+    // If the connection header has been read or not.
+    private boolean connectionHeaderRead = false;
     protected SocketChannel channel;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
     protected final LinkedList<Call> responseQueue;
     private volatile int rpcCount = 0; // number of outstanding rpcs
     private long lastContact;
-    private int dataLength;
     private InetAddress addr;
     protected Socket socket;
     // Cache the remote host & port info so that even if the socket is
     // disconnected, we can say where it used to connect to.
     protected String hostAddress;
     protected int remotePort;
-    ConnectionHeader header;
+    ConnectionHeader connectionHeader;
+    /**
+     * Codec the client asked use.
+     */
+    private Codec codec;
+    /**
+     * Compression codec the client asked us use.
+     */
+    private CompressionCodec compressionCodec;
     Class<? extends IpcProtocol> protocol;
     protected UserGroupInformation user = null;
     private AuthMethod authMethod;
     private boolean saslContextEstablished;
     private boolean skipInitialSaslHandshake;
-    private ByteBuffer rpcHeaderBuffer;
     private ByteBuffer unwrappedData;
+    // When is this set?  FindBugs wants to know!  Says NP
     private ByteBuffer unwrappedDataLengthBuffer;
     boolean useSasl;
     SaslServer saslServer;
     private boolean useWrap = false;
     // Fake 'call' for failed authorization response
     private static final int AUTHROIZATION_FAILED_CALLID = -1;
-    private final Call authFailedCall = new Call(AUTHROIZATION_FAILED_CALLID,
-        null, this, null, 0, null);
+    private final Call authFailedCall =
+      new Call(AUTHROIZATION_FAILED_CALLID, null, null, null, this, null, 0, null);
     private ByteArrayOutputStream authFailedResponse =
         new ByteArrayOutputStream();
     // Fake 'call' for SASL context setup
     private static final int SASL_CALLID = -33;
-    private final Call saslCall = new Call(SASL_CALLID, null, this, null, 0,
-        null);
+    private final Call saslCall =
+      new Call(SASL_CALLID, null, null, null, this, null, 0, null);
 
     public UserGroupInformation attemptingUser = null; // user name before auth
+
     public Connection(SocketChannel channel, long lastContact) {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
       this.dataLengthBuffer = ByteBuffer.allocate(4);
       this.socket = channel.socket();
-      InetAddress addr = socket.getInetAddress();
+      this.addr = socket.getInetAddress();
       if (addr == null) {
         this.hostAddress = "*Unknown*";
       } else {
@@ -1251,8 +1309,9 @@ public abstract class HBaseServer implem
               UserGroupInformation current = UserGroupInformation
               .getCurrentUser();
               String fullName = current.getUserName();
-              if (LOG.isDebugEnabled())
+              if (LOG.isDebugEnabled()) {
                 LOG.debug("Kerberos principal name is " + fullName);
+              }
               final String names[] = SaslUtil.splitKerberosName(fullName);
               if (names.length != 3) {
                 throw new AccessControlException(
@@ -1273,13 +1332,14 @@ public abstract class HBaseServer implem
               throw new AccessControlException(
                   "Unable to find SASL server implementation for "
                       + authMethod.getMechanismName());
-            if (LOG.isDebugEnabled())
-              LOG.debug("Created SASL server with mechanism = "
-                  + authMethod.getMechanismName());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
+            }
           }
-          if (LOG.isDebugEnabled())
+          if (LOG.isDebugEnabled()) {
             LOG.debug("Have read input token of size " + saslToken.length
                 + " for processing by saslServer.evaluateResponse()");
+          }
           replyToken = saslServer.evaluateResponse(saslToken);
         } catch (IOException e) {
           IOException sendToClient = e;
@@ -1300,9 +1360,10 @@ public abstract class HBaseServer implem
           throw e;
         }
         if (replyToken != null) {
-          if (LOG.isDebugEnabled())
+          if (LOG.isDebugEnabled()) {
             LOG.debug("Will send token of size " + replyToken.length
                 + " from saslServer.");
+          }
           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
               null);
         }
@@ -1352,49 +1413,52 @@ public abstract class HBaseServer implem
       }
     }
 
+    /**
+     * Read off the wire.
+     * @return Returns -1 if failure (and caller will close connection) else return how many
+     * bytes were read and processed
+     * @throws IOException
+     * @throws InterruptedException
+     */
     public int readAndProcess() throws IOException, InterruptedException {
       while (true) {
-        /* Read at most one RPC. If the header is not read completely yet
-         * then iterate until we read first RPC or until there is no data left.
-         */
-        int count = -1;
-        if (dataLengthBuffer.remaining() > 0) {
-          count = channelRead(channel, dataLengthBuffer);
-          if (count < 0 || dataLengthBuffer.remaining() > 0)
+        // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
+        // does, read in the rest of the connection preamble, the version and the auth method.
+        // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
+        // length into the 4-byte this.dataLengthBuffer.
+        int count;
+        if (this.dataLengthBuffer.remaining() > 0) {
+          count = channelRead(channel, this.dataLengthBuffer);
+          if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
             return count;
-        }
-
-        if (!rpcHeaderRead) {
-          //Every connection is expected to send the header.
-          if (rpcHeaderBuffer == null) {
-            rpcHeaderBuffer = ByteBuffer.allocate(2);
           }
-          count = channelRead(channel, rpcHeaderBuffer);
-          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
+        }
+        // If we have not read the connection setup preamble, look to see if that is on the wire.
+        if (!connectionPreambleRead) {
+          // Check for 'HBas' magic.
+          this.dataLengthBuffer.flip();
+          if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
+            return doBadPreambleHandling("Expected HEADER=" +
+              Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
+              " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()));
+          }
+          // Now read the next two bytes, the version and the auth to use.
+          ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
+          count = channelRead(channel, versionAndAuthBytes);
+          if (count < 0 || versionAndAuthBytes.remaining() > 0) {
             return count;
           }
-          int version = rpcHeaderBuffer.get(0);
-          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
-          authMethod = AuthMethod.read(new DataInputStream(
-              new ByteArrayInputStream(method)));
-          dataLengthBuffer.flip();
-          if (!HConstants.RPC_HEADER.equals(dataLengthBuffer) || version != HConstants.CURRENT_VERSION) {
-              LOG.warn("Incorrect header or version mismatch from " +
-                  hostAddress + ":" + remotePort +
-                  " got version " + version +
-                  " expected version " + HConstants.CURRENT_VERSION);
-            setupBadVersionResponse(version);
-            return -1;
-          }
-          dataLengthBuffer.clear();
-          if (authMethod == null) {
-            throw new IOException("Unable to read authentication method");
+          int version = versionAndAuthBytes.get(0);
+          byte authbyte = versionAndAuthBytes.get(1);
+          this.authMethod = AuthMethod.valueOf(authbyte);
+          if (version != CURRENT_VERSION || authMethod == null) {
+            return doBadPreambleHandling("serverVersion=" + CURRENT_VERSION +
+              ", clientVersion=" + version + ", authMethod=" + authbyte +
+              ", authSupported=" + (authMethod != null));
           }
           if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
-            AccessControlException ae = new AccessControlException(
-                "Authentication is required");
-            setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
-                ae.getClass().getName(), ae.getMessage());
+            AccessControlException ae = new AccessControlException("Authentication is required");
+            setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
             responder.doRespond(authFailedCall);
             throw ae;
           }
@@ -1410,18 +1474,18 @@ public abstract class HBaseServer implem
           if (authMethod != AuthMethod.SIMPLE) {
             useSasl = true;
           }
-
-          rpcHeaderBuffer = null;
-          rpcHeaderRead = true;
+          connectionPreambleRead = true;
+          // Preamble checks out. Go around again to read actual connection header.
+          dataLengthBuffer.clear();
           continue;
         }
-
+        // We have read a length and we have read the preamble.  It is either the connection header
+        // or it is a request.
         if (data == null) {
           dataLengthBuffer.flip();
-          dataLength = dataLengthBuffer.getInt();
-
+          int dataLength = dataLengthBuffer.getInt();
           if (dataLength == HBaseClient.PING_CALL_ID) {
-            if(!useWrap) { //covers the !useSasl too
+            if (!useWrap) { //covers the !useSasl too
               dataLengthBuffer.clear();
               return 0;  //ping message
             }
@@ -1433,9 +1497,7 @@ public abstract class HBaseServer implem
           data = ByteBuffer.allocate(dataLength);
           incRpcCount();  // Increment the rpc count
         }
-
         count = channelRead(channel, data);
-
         if (data.remaining() == 0) {
           dataLengthBuffer.clear();
           data.flip();
@@ -1444,65 +1506,49 @@ public abstract class HBaseServer implem
             skipInitialSaslHandshake = false;
             continue;
           }
-          boolean isHeaderRead = headerRead;
+          boolean headerRead = connectionHeaderRead;
           if (useSasl) {
             saslReadAndProcess(data.array());
           } else {
             processOneRpc(data.array());
           }
-          data = null;
-          if (!isHeaderRead) {
+          this.data = null;
+          if (!headerRead) {
             continue;
           }
+        } else {
+          // More to read still; go around again.
+          if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
+          continue;
         }
         return count;
       }
     }
 
-    /**
-     * Try to set up the response to indicate that the client version
-     * is incompatible with the server. This can contain special-case
-     * code to speak enough of past IPC protocols to pass back
-     * an exception to the caller.
-     * @param clientVersion the version the caller is using
-     * @throws IOException
-     */
-    private void setupBadVersionResponse(int clientVersion) throws IOException {
-      String errMsg = "Server IPC version " + HConstants.CURRENT_VERSION +
-      " cannot communicate with client version " + clientVersion;
-      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
-      if (clientVersion >= 3) {
-        // We used to return an id of -1 which caused server to close the
-        // connection without telling the client what the problem was.  Now
-        // we return 0 which will keep the socket up -- bad clients, unless
-        // they switch to suit the running server -- will fail later doing
-        // getProtocolVersion.
-        Call fakeCall = new Call(0, null, this, responder, 0, null);
-        // Versions 3 and greater can interpret this exception
-        // response in the same manner
-        setupResponse(buffer, fakeCall, Status.FATAL,
-            VersionMismatch.class.getName(), errMsg);
-
-        responder.doRespond(fakeCall);
-      }
+    private int doBadPreambleHandling(final String errMsg) throws IOException {
+      String msg = errMsg + "; cannot communicate with client at " + hostAddress + ":" + port;
+      LOG.warn(msg);
+      Call fakeCall = new Call(-1, null, null, null, this, responder, -1, null);
+      setupResponse(null, fakeCall, new FatalConnectionException(msg), msg);
+      responder.doRespond(fakeCall);
+      // Returning -1 closes out the connection.
+      return -1;
     }
 
-    /// Reads the connection header following version
-    private void processHeader(byte[] buf) throws IOException {
-      DataInputStream in =
-        new DataInputStream(new ByteArrayInputStream(buf));
-      header = ConnectionHeader.parseFrom(in);
+    // Reads the connection header following version
+    private void processConnectionHeader(byte[] buf) throws IOException {
+      this.connectionHeader = ConnectionHeader.parseFrom(buf);
       try {
-        String protocolClassName = header.getProtocol();
+        String protocolClassName = connectionHeader.getProtocol();
         if (protocolClassName != null) {
-          protocol = getProtocolClass(header.getProtocol(), conf);
+          protocol = getProtocolClass(connectionHeader.getProtocol(), conf);
         }
       } catch (ClassNotFoundException cnfe) {
-        throw new IOException("Unknown protocol: " + header.getProtocol());
+        throw new IOException("Unknown protocol: " + connectionHeader.getProtocol());
       }
+      setupCellBlockCodecs(this.connectionHeader);
 
-      UserGroupInformation protocolUser = createUser(header);
+      UserGroupInformation protocolUser = createUser(connectionHeader);
       if (!useSasl) {
         user = protocolUser;
         if (user != null) {
@@ -1535,6 +1581,30 @@ public abstract class HBaseServer implem
       }
     }
 
+    /**
+     * Set up cell block codecs
+     * @param header
+     * @throws FatalConnectionException
+     */
+    private void setupCellBlockCodecs(final ConnectionHeader header)
+    throws FatalConnectionException {
+      // TODO: Plug in other supported decoders.
+      if (!header.hasCellBlockCodecClass()) throw new FatalConnectionException("No codec");
+      String className = header.getCellBlockCodecClass();
+      try {
+        this.codec = (Codec)Class.forName(className).newInstance();
+      } catch (Exception e) {
+        throw new FatalConnectionException("Unsupported codec " + className, e);
+      }
+      if (!header.hasCellBlockCompressorClass()) return;
+      className = header.getCellBlockCompressorClass();
+      try {
+        this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
+      } catch (Exception e) {
+        throw new FatalConnectionException("Unsupported codec " + className, e);
+      }
+    }
+
     private void processUnwrappedData(byte[] inBuf) throws IOException,
     InterruptedException {
       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
@@ -1576,74 +1646,103 @@ public abstract class HBaseServer implem
 
     private void processOneRpc(byte[] buf) throws IOException,
     InterruptedException {
-      if (headerRead) {
-        processData(buf);
+      if (connectionHeaderRead) {
+        processRequest(buf);
       } else {
-        processHeader(buf);
-        headerRead = true;
+        processConnectionHeader(buf);
+        this.connectionHeaderRead = true;
         if (!authorizeConnection()) {
           throw new AccessControlException("Connection from " + this
-              + " for protocol " + header.getProtocol()
+              + " for protocol " + connectionHeader.getProtocol()
               + " is unauthorized for user " + user);
         }
       }
     }
 
-    protected void processData(byte[] buf) throws  IOException, InterruptedException {
-      DataInputStream dis =
-        new DataInputStream(new ByteArrayInputStream(buf));
-      RpcRequestHeader request = RpcRequestHeader.parseDelimitedFrom(dis);
-
-      int id = request.getCallId();
-      long callSize = buf.length;
-
+    /**
+     * @param buf Has the request header and the request param and optionally encoded data buffer
+     * all in this one array.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected void processRequest(byte[] buf) throws IOException, InterruptedException {
+      long totalRequestSize = buf.length;
+      int offset = 0;
+      // Here we read in the header.  We avoid having pb
+      // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
+      CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
+      int headerSize = cis.readRawVarint32();
+      offset = cis.getTotalBytesRead();
+      RequestHeader header =
+        RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
+      offset += headerSize;
+      int id = header.getCallId();
       if (LOG.isDebugEnabled()) {
-        LOG.debug(" got call #" + id + ", " + callSize + " bytes");
+        LOG.debug("RequestHeader " + TextFormat.shortDebugString(header) +
+          " totalRequestSize: " + totalRequestSize + " bytes");
       }
       // Enforcing the call queue size, this triggers a retry in the client
-      if ((callSize + callQueueSize.get()) > maxQueueSize) {
-        final Call callTooBig = new Call(id, null, this, responder, callSize,
-            null);
+      // This is a bit late to be doing this check - we have already read in the total request.
+      if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
+        final Call callTooBig =
+          new Call(id, null, null, null, this, responder, totalRequestSize, null);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        setupResponse(responseBuffer, callTooBig, Status.FATAL,
-            IOException.class.getName(),
-            "Call queue is full, is ipc.server.max.callqueue.size too small?");
+        setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
+          "Call queue is full, is ipc.server.max.callqueue.size too small?");
         responder.doRespond(callTooBig);
         return;
       }
-
-      RpcRequestBody rpcRequestBody;
+      Method method = null;
+      Message param = null;
+      CellScanner cellScanner = null;
       try {
-        rpcRequestBody = RpcRequestBody.parseDelimitedFrom(dis);
+        if (header.hasRequestParam() && header.getRequestParam()) {
+          method = methodCache.getMethod(this.protocol, header.getMethodName());
+          Message m = methodCache.getMethodArgType(method);
+          // Check that there is a param to deserialize.
+          if (m != null) {
+            Builder builder = null;
+            builder = m.newBuilderForType();
+            // To read the varint, I need an inputstream; might as well be a CIS.
+            cis = CodedInputStream.newInstance(buf, offset, buf.length);
+            int paramSize = cis.readRawVarint32();
+            offset += cis.getTotalBytesRead();
+            if (builder != null) {
+              builder.mergeFrom(buf, offset, paramSize);
+              param = builder.build();
+            }
+            offset += paramSize;
+          }
+        }
+        if (header.hasCellBlockMeta()) {
+          cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
+            buf, offset, buf.length);
+        }
       } catch (Throwable t) {
-        LOG.warn("Unable to read call parameters for client " +
-                 getHostAddress(), t);
-        final Call readParamsFailedCall = new Call(id, null, this, responder,
-            callSize, null);
+        String msg = "Unable to read call parameter from client " + getHostAddress();
+        LOG.warn(msg, t);
+        final Call readParamsFailedCall =
+          new Call(id, null, null, null, this, responder, totalRequestSize, null);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL,
-            t.getClass().getName(),
-            "IPC server unable to read call parameters: " + t.getMessage());
+        setupResponse(responseBuffer, readParamsFailedCall, t,
+          msg + "; " + t.getMessage());
         responder.doRespond(readParamsFailedCall);
         return;
       }
 
-      Call call;
-      if (request.hasTinfo()) {
-        call = new Call(id, rpcRequestBody, this, responder, callSize,
-            new TraceInfo(request.getTinfo().getTraceId(), request.getTinfo()
-                .getParentId()));
+      Call call = null;
+      if (header.hasTraceInfo()) {
+        call = new Call(id, method, param, cellScanner, this, responder, totalRequestSize,
+          new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()));
       } else {
-        call = new Call(id, rpcRequestBody, this, responder, callSize, null);
+        call = new Call(id, method, param, cellScanner, this, responder, totalRequestSize, null);
       }
-
-      callQueueSize.add(callSize);
-
-      if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) {
+      callQueueSize.add(totalRequestSize);
+      Pair<RequestHeader, Message> headerAndParam = new Pair<RequestHeader, Message>(header, param);
+      if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) {
         priorityCallQueue.put(call);
-      } else if (replicationQueue != null
-          && getQosLevel(rpcRequestBody) == HConstants.REPLICATION_QOS) {
+      } else if (replicationQueue != null &&
+          getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) {
         replicationQueue.put(call);
       } else {
         callQueue.put(call);              // queue the call; maybe blocked here
@@ -1660,16 +1759,15 @@ public abstract class HBaseServer implem
             && (authMethod != AuthMethod.DIGEST)) {
           ProxyUsers.authorize(user, this.getHostAddress(), conf);
         }
-        authorize(user, header, getHostInetAddress());
+        authorize(user, connectionHeader, getHostInetAddress());
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully authorized " + header);
+          LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
         }
         metrics.authorizationSuccess();
       } catch (AuthorizationException ae) {
-        LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
+        LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
         metrics.authorizationFailure();
-        setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
-            ae.getClass().getName(), ae.getMessage());
+        setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
         responder.doRespond(authFailedCall);
         return false;
       }
@@ -1679,7 +1777,7 @@ public abstract class HBaseServer implem
     protected synchronized void close() {
       disposeSasl();
       data = null;
-      dataLengthBuffer = null;
+      this.dataLengthBuffer = null;
       if (!channel.isOpen())
         return;
       try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
@@ -1743,49 +1841,38 @@ public abstract class HBaseServer implem
       status.setStatus("starting");
       SERVER.set(HBaseServer.this);
       while (running) {
-
         try {
           status.pause("Waiting for a call");
           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
           status.setStatus("Setting up call");
-          status.setConnection(call.connection.getHostAddress(),
-              call.connection.getRemotePort());
-
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": has #" + call.id + " from " +
-                      call.connection);
-
-          String errorClass = null;
+          status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
+          if (LOG.isDebugEnabled()) {
+            UserGroupInformation remoteUser = call.connection.user;
+            LOG.debug(call.toString() + " executing as " +
+              ((remoteUser == null)? "NULL principal": remoteUser.getUserName()));
+          }
+          Throwable errorThrowable = null;
           String error = null;
-          Message value = null;
-
+          Pair<Message, CellScanner> resultPair = null;
           CurCall.set(call);
           Span currentRequestSpan = NullSpan.getInstance();
           try {
-            if (!started)
+            if (!started) {
               throw new ServerNotRunningYetException("Server is not running yet");
-
+            }
             if (call.tinfo != null) {
               currentRequestSpan = Trace.startSpan(
                   "handling " + call.toString(), call.tinfo, Sampler.ALWAYS);
             }
-
-            if (LOG.isDebugEnabled()) {
-              UserGroupInformation remoteUser = call.connection.user;
-              LOG.debug(getName() + ": call #" + call.id + " executing as "
-                  + (remoteUser == null ? "NULL principal" :
-                    remoteUser.getUserName()));
-            }
-
             RequestContext.set(User.create(call.connection.user), getRemoteIp(),
-                call.connection.protocol);
+              call.connection.protocol);
 
             // make the call
-            value = call(call.connection.protocol, call.rpcRequestBody, call.timestamp,
-                status);
+            resultPair = call(call.connection.protocol, call.method, call.param, call.cellScanner,
+              call.timestamp, status);
           } catch (Throwable e) {
-            LOG.debug(getName()+", call "+call+": error: " + e, e);
-            errorClass = e.getClass().getName();
+            LOG.debug(getName() + ": " + call.toString() + " error: " + e, e);
+            errorThrowable = e;
             error = StringUtils.stringifyException(e);
           } finally {
             currentRequestSpan.stop();
@@ -1798,21 +1885,20 @@ public abstract class HBaseServer implem
           // Set the response for undelayed calls and delayed calls with
           // undelayed responses.
           if (!call.isDelayed() || !call.isReturnValueDelayed()) {
-            call.setResponse(value,
-              errorClass == null? Status.SUCCESS: Status.ERROR,
-                errorClass, error);
+            Message param = resultPair != null? resultPair.getFirst(): null;
+            CellScanner cells = resultPair != null? resultPair.getSecond(): null;
+            call.setResponse(param, cells, errorThrowable, error);
           }
           call.sendResponseIfReady();
           status.markComplete("Sent response");
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
-                     StringUtils.stringifyException(e));
+            LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e));
           }
         } catch (OutOfMemoryError e) {
           if (errorHandler != null) {
             if (errorHandler.checkOOME(e)) {
-              LOG.info(getName() + ": exiting on OOME");
+              LOG.info(getName() + ": exiting on OutOfMemoryError");
               return;
             }
           } else {
@@ -1820,44 +1906,16 @@ public abstract class HBaseServer implem
             throw e;
           }
        } catch (ClosedChannelException cce) {
-          LOG.warn(getName() + " caught a ClosedChannelException, " +
+          LOG.warn(getName() + ": caught a ClosedChannelException, " +
             "this means that the server was processing a " +
             "request but the client went away. The error message was: " +
             cce.getMessage());
         } catch (Exception e) {
-          LOG.warn(getName() + " caught: " +
-                   StringUtils.stringifyException(e));
+          LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e));
         }
       }
       LOG.info(getName() + ": exiting");
     }
-
-  }
-
-
-  private Function<RpcRequestBody,Integer> qosFunction = null;
-
-  /**
-   * Gets the QOS level for this call.  If it is higher than the highPriorityLevel and there
-   * are priorityHandlers available it will be processed in it's own thread set.
-   *
-   * @param newFunc
-   */
-  @Override
-  public void setQosFunction(Function<RpcRequestBody, Integer> newFunc) {
-    qosFunction = newFunc;
-  }
-
-  protected int getQosLevel(RpcRequestBody rpcRequestBody) {
-    if (qosFunction == null) {
-      return 0;
-    }
-
-    Integer res = qosFunction.apply(rpcRequestBody);
-    if (res == null) {
-      return 0;
-    }
-    return res;
   }
 
   /* Constructs a server listening on the named port and address.  Parameters passed must
@@ -1913,6 +1971,7 @@ public abstract class HBaseServer implem
     this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
                                         DEFAULT_WARN_DELAYED_CALLS);
     this.delayedCalls = new AtomicInteger(0);
+    this.ipcUtil = new IPCUtil(conf);
 
 
     // Create the responder here
@@ -1943,12 +2002,10 @@ public abstract class HBaseServer implem
    * @param error error message, if the call failed
    * @throws IOException
    */
-  private void setupResponse(ByteArrayOutputStream response,
-                             Call call, Status status,
-                             String errorClass, String error)
+  private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
   throws IOException {
-    response.reset();
-    call.setResponse(null, status, errorClass, error);
+    if (response != null) response.reset();
+    call.setResponse(null, null, t, error);
   }
 
   protected void closeConnection(Connection connection) {
@@ -2088,6 +2145,7 @@ public abstract class HBaseServer implem
    * @param addr InetAddress of incoming connection
    * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
    */
+  @SuppressWarnings("static-access")
   public void authorize(UserGroupInformation user,
                         ConnectionHeader connection,
                         InetAddress addr

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java Wed Mar 20 19:39:50 2013
@@ -23,36 +23,33 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
 /**
  * The {@link RpcServerEngine} implementation for ProtoBuf-based RPCs.
  */
 @InterfaceAudience.Private
 class ProtobufRpcServerEngine implements RpcServerEngine {
-  private static final Log LOG =
-      LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine");
-
   ProtobufRpcServerEngine() {
     super();
   }
@@ -66,7 +63,6 @@ class ProtobufRpcServerEngine implements
         metaHandlerCount, verbose, highPriorityLevel);
   }
 
-
   public static class Server extends HBaseServer {
     boolean verbose;
     Object instance;
@@ -111,10 +107,6 @@ class ProtobufRpcServerEngine implements
       this.instance = instance;
       this.implementation = instance.getClass();
     }
-    private static final Map<String, Message> methodArg =
-        new ConcurrentHashMap<String, Message>();
-    private static final Map<String, Method> methodInstances =
-        new ConcurrentHashMap<String, Method>();
 
     private AuthenticationTokenSecretManager createSecretManager(){
       if (!isSecurityEnabled ||
@@ -152,37 +144,20 @@ class ProtobufRpcServerEngine implements
      * the return response has protobuf response payload. On failure, the
      * exception name and the stack trace are returned in the protobuf response.
      */
-    public Message call(Class<? extends IpcProtocol> protocol,
-      RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
+    public Pair<Message, CellScanner> call(Class<? extends IpcProtocol> protocol,
+      Method method, Message param, CellScanner cellScanner, long receiveTime,
+      MonitoredRPCHandler status)
     throws IOException {
       try {
-        String methodName = rpcRequest.getMethodName();
-        Method method = getMethod(protocol, methodName);
-        if (method == null) {
-          throw new UnknownProtocolException("Method " + methodName +
-              " doesn't exist in protocol " + protocol.getName());
-        }
-
-        /**
-         * RPCs for a particular interface (ie protocol) are done using a
-         * IPC connection that is setup using rpcProxy.
-         * The rpcProxy's has a declared protocol name that is
-         * sent form client to server at connection time.
-         */
-
         if (verbose) {
-          LOG.info("Call: protocol name=" + protocol.getName() +
-              ", method=" + methodName);
+          LOG.info("callId: " + CurCall.get().id + " protocol: " + protocol.getName() +
+            " method: " + method.getName());
         }
-
-        status.setRPC(rpcRequest.getMethodName(),
-            new Object[]{rpcRequest.getRequest()}, receiveTime);
-        status.setRPCPacket(rpcRequest);
+        status.setRPC(method.getName(), new Object[]{param}, receiveTime);
+        // TODO: Review after we add in encoded data blocks.
+        status.setRPCPacket(param);
         status.resume("Servicing call");
         //get an instance of the method arg type
-        Message protoType = getMethodArgType(method);
-        Message param = protoType.newBuilderForType()
-            .mergeFrom(rpcRequest.getRequest()).build();
         Message result;
         Object impl = null;
         if (protocol.isAssignableFrom(this.implementation)) {
@@ -190,57 +165,53 @@ class ProtobufRpcServerEngine implements
         } else {
           throw new UnknownProtocolException(protocol);
         }
-
+        PayloadCarryingRpcController controller = null;
         long startTime = System.currentTimeMillis();
         if (method.getParameterTypes().length == 2) {
-          // RpcController + Message in the method args
-          // (generated code from RPC bits in .proto files have RpcController)
-          result = (Message)method.invoke(impl, null, param);
-        } else if (method.getParameterTypes().length == 1) {
-          // Message (hand written code usually has only a single argument)
-          result = (Message)method.invoke(impl, param);
+          // Always create a controller.  Some invocations may not pass data in but will pass
+          // data out and they'll need a controller instance to carry it for them.
+          controller = new PayloadCarryingRpcController(cellScanner);
+          result = (Message)method.invoke(impl, controller, param);
         } else {
-          throw new ServiceException("Too many parameters for method: ["
-              + method.getName() + "]" + ", allowed (at most): 2, Actual: "
-              + method.getParameterTypes().length);
+          throw new ServiceException("Wrong number of parameters for method: [" +
+            method.getName() + "]" + ", wanted: 2, actual: " + method.getParameterTypes().length);
         }
         int processingTime = (int) (System.currentTimeMillis() - startTime);
         int qTime = (int) (startTime-receiveTime);
-        if (TRACELOG.isDebugEnabled()) {
-          TRACELOG.debug("Call #" + CurCall.get().id +
-              "; served=" + protocol.getSimpleName() + "#" + method.getName() +
-              ", queueTime=" + qTime +
-              ", processingTime=" + processingTime +
-              ", request=" + param.toString() +
-              " response=" + result.toString());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(CurCall.get().toString() +
+              " response: " + TextFormat.shortDebugString(result) +
+              " served: " + protocol.getSimpleName() +
+              " queueTime: " + qTime +
+              " processingTime: " + processingTime);
         }
         metrics.dequeuedCall(qTime);
         metrics.processedCall(processingTime);
-
         if (verbose) {
-          log("Return: "+result, LOG);
+          log("Return " + TextFormat.shortDebugString(result), LOG);
         }
         long responseSize = result.getSerializedSize();
         // log any RPC responses that are slower than the configured warn
         // response time or larger than configured warning size
-        boolean tooSlow = (processingTime > warnResponseTime
-            && warnResponseTime > -1);
-        boolean tooLarge = (responseSize > warnResponseSize
-            && warnResponseSize > -1);
+        boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
+        boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
         if (tooSlow || tooLarge) {
           // when tagging, we let TooLarge trump TooSmall to keep output simple
           // note that large responses will often also be slow.
+          // TOOD: This output is useless.... output the serialized pb as toString but do a
+          // short form, shorter than TextFormat.shortDebugString(proto).
           StringBuilder buffer = new StringBuilder(256);
-          buffer.append(methodName);
+          buffer.append(method.getName());
           buffer.append("(");
           buffer.append(param.getClass().getName());
           buffer.append(")");
-          logResponse(new Object[]{rpcRequest.getRequest()},
-              methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
+          logResponse(new Object[]{param},
+              method.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
               status.getClient(), startTime, processingTime, qTime,
               responseSize);
         }
-        return result;
+        return new Pair<Message, CellScanner>(result,
+          controller != null? controller.cellScanner(): null);
       } catch (InvocationTargetException e) {
         Throwable target = e.getTargetException();
         if (target instanceof IOException) {
@@ -262,48 +233,6 @@ class ProtobufRpcServerEngine implements
       }
     }
 
-    static Method getMethod(Class<? extends IpcProtocol> protocol,
-                            String methodName) {
-      Method method = methodInstances.get(methodName);
-      if (method != null) {
-        return method;
-      }
-      Method[] methods = protocol.getMethods();
-      for (Method m : methods) {
-        if (m.getName().equals(methodName)) {
-          m.setAccessible(true);
-          methodInstances.put(methodName, m);
-          return m;
-        }
-      }
-      return null;
-    }
-
-    static Message getMethodArgType(Method method) throws Exception {
-      Message protoType = methodArg.get(method.getName());
-      if (protoType != null) {
-        return protoType;
-      }
-
-      Class<?>[] args = method.getParameterTypes();
-      Class<?> arg;
-      if (args.length == 2) {
-        // RpcController + Message in the method args
-        // (generated code from RPC bits in .proto files have RpcController)
-        arg = args[1];
-      } else if (args.length == 1) {
-        arg = args[0];
-      } else {
-        //unexpected
-        return null;
-      }
-      //in the protobuf methods, args[1] is the only significant argument
-      Method newInstMethod = arg.getMethod("getDefaultInstance");
-      newInstMethod.setAccessible(true);
-      protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
-      methodArg.put(method.getName(), protoType);
-      return protoType;
-    }
     /**
      * Logs an RPC response to the LOG file, producing valid JSON objects for
      * client Operations.
@@ -361,10 +290,12 @@ class ProtobufRpcServerEngine implements
             mapper.writeValueAsString(responseInfo));
       }
     }
+
     protected static void log(String value, Log LOG) {
       String v = value;
-      if (v != null && v.length() > 55)
-        v = v.substring(0, 55)+"...";
+      final int max = 100;
+      if (v != null && v.length() > max)
+        v = v.substring(0, max) + "...";
       LOG.info(v);
     }
   }