You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/08/19 23:47:22 UTC

svn commit: r1374860 [1/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/monitoring/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/apache/hadoop/hbase/regionserver/ ma...

Author: stack
Date: Sun Aug 19 21:47:21 2012
New Revision: 1374860

URL: http://svn.apache.org/viewvc?rev=1374860&view=rev
Log:
HBASE-6414 Remove the WritableRpcEngine & associated Invocation classes

Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
    hbase/trunk/hbase-server/src/test/protobuf/test_delayed_rpc.proto
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/protobuf/RPC.proto
    hbase/trunk/hbase-server/src/main/resources/hbase-default.xml
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java Sun Aug 19 21:47:21 2012
@@ -54,14 +54,10 @@ class ClientCache {
    */
   protected synchronized HBaseClient getClient(Configuration conf,
       SocketFactory factory) {
-    return getClient(conf, factory, HbaseObjectWritable.class);
-  }
-  protected synchronized HBaseClient getClient(Configuration conf,
-      SocketFactory factory, Class<? extends Writable> valueClass) {
     HBaseClient client = clients.get(factory);
     if (client == null) {
       // Make an hbase client instead of hadoop Client.
-      client = new HBaseClient(valueClass, conf, factory);
+      client = new HBaseClient(conf, factory);
       clients.put(factory, client);
     } else {
       client.incCount();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Sun Aug 19 21:47:21 2012
@@ -55,6 +55,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@@ -69,10 +71,8 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
@@ -80,11 +80,14 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
 
-/** A client for an IPC service.  IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value.  A service runs on
+
+/** A client for an IPC service.  IPC calls take a single Protobuf message as a
+ * parameter, and return a single Protobuf message as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
  *
  * <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
@@ -99,7 +102,6 @@ public class HBaseClient {
       .getLog("org.apache.hadoop.ipc.HBaseClient");
   protected final PoolMap<ConnectionId, Connection> connections;
 
-  protected final Class<? extends Writable> valueClass;   // class of call values
   protected int counter;                            // counter for call ids
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final protected Configuration conf;
@@ -187,13 +189,13 @@ public class HBaseClient {
   /** A call waiting for a value. */
   protected class Call {
     final int id;                                       // call id
-    final Writable param;                               // parameter
-    Writable value;                               // value, null if error
+    final RpcRequestBody param;                         // rpc request object
+    Message value;                               // value, null if error
     IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
     long startTime;
 
-    protected Call(Writable param) {
+    protected Call(RpcRequestBody param) {
       this.param = param;
       this.startTime = System.currentTimeMillis();
       synchronized (HBaseClient.this) {
@@ -223,7 +225,7 @@ public class HBaseClient {
      *
      * @param value return value of the call.
      */
-    public synchronized void setValue(Writable value) {
+    public synchronized void setValue(Message value) {
       this.value = value;
       callComplete();
     }
@@ -825,15 +827,19 @@ public class HBaseClient {
       try {
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " sending #" + call.id);
-        RpcRequestHeader.Builder builder = RPCProtos.RpcRequestHeader.newBuilder();
-        builder.setCallId(call.id);
-        DataOutputBuffer d = new DataOutputBuffer();
-        builder.build().writeDelimitedTo(d);
-        call.param.write(d);
+        RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder();
+        headerBuilder.setCallId(call.id);
         //noinspection SynchronizeOnNonFinalField
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
-          this.out.writeInt(d.getLength());
-          this.out.write(d.getData(), 0, d.getLength());
+          RpcRequestHeader header = headerBuilder.build();
+          int serializedHeaderSize = header.getSerializedSize();
+          int requestSerializedSize = call.param.getSerializedSize();
+          this.out.writeInt(serializedHeaderSize +
+              CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) +
+              requestSerializedSize +
+              CodedOutputStream.computeRawVarint32Size(requestSerializedSize));
+          header.writeDelimitedTo(this.out);
+          call.param.writeDelimitedTo(this.out);
           this.out.flush();
         }
       } catch(IOException e) {
@@ -870,8 +876,17 @@ public class HBaseClient {
 
         Status status = response.getStatus();
         if (status == Status.SUCCESS) {
-          Writable value = ReflectionUtils.newInstance(valueClass, conf);
-          value.readFields(in);                 // read value
+          Message rpcResponseType;
+          try {
+            rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
+                ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
+                    call.param.getMethodName()));
+          } catch (Exception e) {
+            throw new RuntimeException(e); //local exception
+          }
+          Builder builder = rpcResponseType.newBuilderForType();
+          builder.mergeDelimitedFrom(in);
+          Message value = builder.build();
           // it's possible that this call may have been cleaned up due to a RPC
           // timeout, so check if it still exists before setting the value.
           if (call != null) {
@@ -983,7 +998,7 @@ public class HBaseClient {
     private final ParallelResults results;
     protected final int index;
 
-    public ParallelCall(Writable param, ParallelResults results, int index) {
+    public ParallelCall(RpcRequestBody param, ParallelResults results, int index) {
       super(param);
       this.results = results;
       this.index = index;
@@ -998,12 +1013,12 @@ public class HBaseClient {
 
   /** Result collector for parallel calls. */
   protected static class ParallelResults {
-    protected final Writable[] values;
+    protected final Message[] values;
     protected int size;
     protected int count;
 
     public ParallelResults(int size) {
-      this.values = new Writable[size];
+      this.values = new RpcResponseBody[size];
       this.size = size;
     }
 
@@ -1020,15 +1035,13 @@ public class HBaseClient {
   }
 
   /**
-   * Construct an IPC client whose values are of the given {@link Writable}
+   * Construct an IPC client whose values are of the {@link Message}
    * class.
    * @param valueClass value class
    * @param conf configuration
    * @param factory socket factory
    */
-  public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
-      SocketFactory factory) {
-    this.valueClass = valueClass;
+  public HBaseClient(Configuration conf, SocketFactory factory) {
     this.maxIdleTime =
       conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
@@ -1051,8 +1064,8 @@ public class HBaseClient {
    * @param valueClass value class
    * @param conf configuration
    */
-  public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
-    this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
+  public HBaseClient(Configuration conf) {
+    this(conf, NetUtils.getDefaultSocketFactory(conf));
   }
 
   /**
@@ -1124,17 +1137,17 @@ public class HBaseClient {
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
-   * @param param writable parameter
+   * @param param RpcRequestBody parameter
    * @param address network address
-   * @return Writable
+   * @return Message
    * @throws IOException e
    */
-  public Writable call(Writable param, InetSocketAddress address)
+  public Message call(RpcRequestBody param, InetSocketAddress address)
   throws IOException, InterruptedException {
       return call(param, address, null, 0);
   }
 
-  public Writable call(Writable param, InetSocketAddress addr,
+  public Message call(RpcRequestBody param, InetSocketAddress addr,
                        User ticket, int rpcTimeout)
                        throws IOException, InterruptedException {
     return call(param, addr, null, ticket, rpcTimeout);
@@ -1145,7 +1158,7 @@ public class HBaseClient {
    * with the <code>ticket</code> credentials, returning the value.
    * Throws exceptions if there are network problems or if the remote code
    * threw an exception. */
-  public Writable call(Writable param, InetSocketAddress addr,
+  public Message call(RpcRequestBody param, InetSocketAddress addr,
                        Class<? extends VersionedProtocol> protocol,
                        User ticket, int rpcTimeout)
       throws InterruptedException, IOException {
@@ -1217,14 +1230,14 @@ public class HBaseClient {
    * corresponding address.  When all values are available, or have timed out
    * or errored, the collected results are returned in an array.  The array
    * contains nulls for calls that timed out or errored.
-   * @param params writable parameters
+   * @param params RpcRequestBody parameters
    * @param addresses socket addresses
-   * @return  Writable[]
+   * @return  RpcResponseBody[]
    * @throws IOException e
-   * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User)} instead
+   * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
    */
   @Deprecated
-  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+  public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
     throws IOException, InterruptedException {
     return call(params, addresses, null, null);
   }
@@ -1233,11 +1246,11 @@ public class HBaseClient {
    * corresponding address.  When all values are available, or have timed out
    * or errored, the collected results are returned in an array.  The array
    * contains nulls for calls that timed out or errored.  */
-  public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
+  public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
                          Class<? extends VersionedProtocol> protocol,
                          User ticket)
       throws IOException, InterruptedException {
-    if (addresses.length == 0) return new Writable[0];
+    if (addresses.length == 0) return new RpcResponseBody[0];
 
     ParallelResults results = new ParallelResults(params.length);
     // TODO this synchronization block doesnt make any sense, we should possibly fix it

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Sun Aug 19 21:47:21 2012
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -46,24 +45,13 @@ import java.util.Map;
  *
  * This is a local hbase copy of the hadoop RPC so we can do things like
  * address HADOOP-414 for hbase-only and try other hbase-specific
- * optimizations like using our own version of ObjectWritable.  Class has been
- * renamed to avoid confusing it w/ hadoop versions.
+ * optimizations.  Class has been renamed to avoid confusing it w/ hadoop
+ * versions.
  * <p>
  *
  *
  * A <i>protocol</i> is a Java interface.  All parameters and return types must
- * be one of:
- *
- * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
- * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
- * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
- *
- * <li>a {@link String}; or</li>
- *
- * <li>a {@link Writable}; or</li>
- *
- * <li>an array of the above types</li> </ul>
- *
+ * be Protobuf objects.
  * All methods in the protocol should throw only IOException.  No field data of
  * the protocol instance is transmitted.
  */
@@ -122,7 +110,7 @@ public class HBaseRPC {
     if (engine == null) {
       // check for a configured default engine
       Class<?> defaultEngine =
-          conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);
+          conf.getClass(RPC_ENGINE_PROP, ProtobufRpcEngine.class);
 
       // check for a per interface override
       Class<?> impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(),
@@ -345,16 +333,6 @@ public class HBaseRPC {
     VersionedProtocol proxy = engine
             .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
                 Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
-    if (engine instanceof WritableRpcEngine) {
-      long serverVersion = proxy.getProtocolVersion(protocol.getName(),
-          clientVersion);
-      if (serverVersion == clientVersion) {
-        return proxy;
-      }
-
-      throw new VersionMismatch(protocol.getName(), clientVersion,
-                              serverVersion);
-    }
     return proxy;
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sun Aug 19 21:47:21 2012
@@ -68,12 +68,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
@@ -87,9 +85,7 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -104,17 +100,16 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 
 import org.cliffc.high_scale_lib.Counter;
 
-/** An abstract IPC service.  IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value.  A service runs on
+/** A client for an IPC service.  IPC calls take a single Protobuf message as a
+ * parameter, and return a single Protobuf message as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
  *
  *
@@ -193,8 +188,8 @@ public abstract class HBaseServer implem
   }
 
   /** Returns the server instance called under or null.  May be called under
-   * {@link #call(Class, Writable, long, MonitoredRPCHandler)} implementations,
-   * and under {@link Writable} methods of paramters and return values.
+   * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
+   * and under protobuf methods of paramters and return values.
    * Permits applications to access the server context.
    * @return HBaseServer
    */
@@ -235,7 +230,6 @@ public abstract class HBaseServer implem
   private int handlerCount;                       // number of handler threads
   private int priorityHandlerCount;
   private int readThreads;                        // number of read threads
-  protected Class<? extends Writable> paramClass; // class of call parameters
   protected int maxIdleTime;                      // the maximum idle time after
                                                   // which a client may be
                                                   // disconnected
@@ -312,7 +306,7 @@ public abstract class HBaseServer implem
   /** A call queued for handling. */
   protected class Call implements RpcCallContext {
     protected int id;                             // the client's call id
-    protected Writable param;                     // the parameter passed
+    protected RpcRequestBody rpcRequestBody;                     // the parameter passed
     protected Connection connection;              // connection to client
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
@@ -324,10 +318,10 @@ public abstract class HBaseServer implem
     protected long size;                          // size of current call
     protected boolean isError;
 
-    public Call(int id, Writable param, Connection connection,
+    public Call(int id, RpcRequestBody rpcRequestBody, Connection connection,
         Responder responder, long size) {
       this.id = id;
-      this.param = param;
+      this.rpcRequestBody = rpcRequestBody;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
       this.response = null;
@@ -339,7 +333,7 @@ public abstract class HBaseServer implem
 
     @Override
     public String toString() {
-      return param.toString() + " from " + connection.toString();
+      return rpcRequestBody.toString() + " from " + connection.toString();
     }
 
     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
@@ -353,34 +347,13 @@ public abstract class HBaseServer implem
       if (errorClass != null) {
         this.isError = true;
       }
-      Writable result = null;
-      if (value instanceof Writable) {
-        result = (Writable) value;
+ 
+      ByteBufferOutputStream buf = null;
+      if (value != null) {
+        buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
       } else {
-        /* We might have a null value and errors. Avoid creating a
-         * HbaseObjectWritable, because the constructor fails on null. */
-        if (value != null) {
-          result = new HbaseObjectWritable(value);
-        }
-      }
-
-      int size = BUFFER_INITIAL_SIZE;
-      if (result instanceof WritableWithSize) {
-        // get the size hint.
-        WritableWithSize ohint = (WritableWithSize) result;
-        long hint = ohint.getWritableSize() + 2*Bytes.SIZEOF_INT;
-        if (hint > Integer.MAX_VALUE) {
-          // oops, new problem.
-          IOException ioe =
-            new IOException("Result buffer size too large: " + hint);
-          errorClass = ioe.getClass().getName();
-          error = StringUtils.stringifyException(ioe);
-        } else {
-          size = (int)hint;
-        }
+        buf = new ByteBufferOutputStream(BUFFER_INITIAL_SIZE);
       }
-
-      ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
       DataOutputStream out = new DataOutputStream(buf);
       try {
         RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
@@ -394,7 +367,9 @@ public abstract class HBaseServer implem
           b.setStackTrace(error);
           b.build().writeDelimitedTo(out);
         } else {
-          result.write(out);
+          if (value != null) {
+            ((Message)value).writeDelimitedTo(out);
+          }
         }
         if (connection.useWrap) {
           wrapWithSasl(buf);
@@ -709,7 +684,7 @@ public abstract class HBaseServer implem
             closeCurrentConnection(key, e);
             cleanupConnections(true);
             try { Thread.sleep(60000); } catch (Exception ignored) {}
-      }
+          }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
@@ -1418,7 +1393,7 @@ public abstract class HBaseServer implem
             AccessControlException ae = new AccessControlException(
                 "Authentication is required");
             setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
-                null, ae.getClass().getName(), ae.getMessage());
+                ae.getClass().getName(), ae.getMessage());
             responder.doRespond(authFailedCall);
             throw ae;
           }
@@ -1506,7 +1481,7 @@ public abstract class HBaseServer implem
         // Versions 3 and greater can interpret this exception
         // response in the same manner
         setupResponse(buffer, fakeCall, Status.FATAL,
-            null, VersionMismatch.class.getName(), errMsg);
+            VersionMismatch.class.getName(), errMsg);
 
         responder.doRespond(fakeCall);
       }
@@ -1623,23 +1598,21 @@ public abstract class HBaseServer implem
       if (LOG.isDebugEnabled()) {
         LOG.debug(" got call #" + id + ", " + callSize + " bytes");
       }
-
       // Enforcing the call queue size, this triggers a retry in the client
       if ((callSize + callQueueSize.get()) > maxQueueSize) {
         final Call callTooBig =
           new Call(id, null, this, responder, callSize);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        setupResponse(responseBuffer, callTooBig, Status.FATAL, null,
+        setupResponse(responseBuffer, callTooBig, Status.FATAL,
             IOException.class.getName(),
             "Call queue is full, is ipc.server.max.callqueue.size too small?");
         responder.doRespond(callTooBig);
         return;
       }
 
-      Writable param;
+      RpcRequestBody rpcRequestBody;
       try {
-        param = ReflectionUtils.newInstance(paramClass, conf);//read param
-        param.readFields(dis);
+        rpcRequestBody = RpcRequestBody.parseDelimitedFrom(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress(), t);
@@ -1647,16 +1620,16 @@ public abstract class HBaseServer implem
           new Call(id, null, this, responder, callSize);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL,
             t.getClass().getName(),
             "IPC server unable to read call parameters: " + t.getMessage());
         responder.doRespond(readParamsFailedCall);
         return;
       }
-      Call call = new Call(id, param, this, responder, callSize);
+      Call call = new Call(id, rpcRequestBody, this, responder, callSize);
       callQueueSize.add(callSize);
 
-      if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
+      if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) {
         priorityCallQueue.put(call);
         updateCallQueueLenMetrics(priorityCallQueue);
       } else {
@@ -1683,7 +1656,7 @@ public abstract class HBaseServer implem
       } catch (AuthorizationException ae) {
         LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
         rpcMetrics.authorizationFailures.inc();
-        setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+        setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
             ae.getClass().getName(), ae.getMessage());
         responder.doRespond(authFailedCall);
         return false;
@@ -1785,7 +1758,7 @@ public abstract class HBaseServer implem
 
           String errorClass = null;
           String error = null;
-          Writable value = null;
+          Message value = null;
 
           CurCall.set(call);
           try {
@@ -1802,7 +1775,7 @@ public abstract class HBaseServer implem
             RequestContext.set(User.create(call.connection.user), getRemoteIp(),
                 call.connection.protocol);
             // make the call
-            value = call(call.connection.protocol, call.param, call.timestamp,
+            value = call(call.connection.protocol, call.rpcRequestBody, call.timestamp,
                 status);
           } catch (Throwable e) {
             LOG.debug(getName()+", call "+call+": error: " + e, e);
@@ -1855,7 +1828,7 @@ public abstract class HBaseServer implem
   }
 
 
-  private Function<Writable,Integer> qosFunction = null;
+  private Function<RpcRequestBody,Integer> qosFunction = null;
 
   /**
    * Gets the QOS level for this call.  If it is higher than the highPriorityLevel and there
@@ -1864,16 +1837,16 @@ public abstract class HBaseServer implem
    * @param newFunc
    */
   @Override
-  public void setQosFunction(Function<Writable, Integer> newFunc) {
+  public void setQosFunction(Function<RpcRequestBody, Integer> newFunc) {
     qosFunction = newFunc;
   }
 
-  protected int getQosLevel(Writable param) {
+  protected int getQosLevel(RpcRequestBody rpcRequestBody) {
     if (qosFunction == null) {
       return 0;
     }
 
-    Integer res = qosFunction.apply(param);
+    Integer res = qosFunction.apply(rpcRequestBody);
     if (res == null) {
       return 0;
     }
@@ -1886,14 +1859,13 @@ public abstract class HBaseServer implem
    *
    */
   protected HBaseServer(String bindAddress, int port,
-                        Class<? extends Writable> paramClass, int handlerCount,
+                        int handlerCount,
                         int priorityHandlerCount, Configuration conf, String serverName,
                         int highPriorityLevel)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
-    this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.priorityHandlerCount = priorityHandlerCount;
     this.socketSendBufferSize = 0;
@@ -1963,26 +1935,10 @@ public abstract class HBaseServer implem
    */
   private void setupResponse(ByteArrayOutputStream response,
                              Call call, Status status,
-                             Writable rv, String errorClass, String error)
+                             String errorClass, String error)
   throws IOException {
     response.reset();
-    DataOutputStream out = new DataOutputStream(response);
-
-    if (status == Status.SUCCESS) {
-      try {
-        rv.write(out);
-        call.setResponse(rv, status, null, null);
-      } catch (Throwable t) {
-        LOG.warn("Error serializing call response for call " + call, t);
-        // Call back to same function - this is OK since the
-        // buffer is reset at the top, and since status is changed
-        // to ERROR it won't infinite loop.
-        call.setResponse(null, status.ERROR, t.getClass().getName(),
-            StringUtils.stringifyException(t));
-      }
-    } else {
-      call.setResponse(rv, status, errorClass, error);
-    }
+    call.setResponse(null, status, errorClass, error);
   }
 
   protected void closeConnection(Connection connection) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java Sun Aug 19 21:47:21 2012
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -35,14 +34,20 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Objects;
-import org.apache.hadoop.hbase.util.ProtoUtil;
+import org.codehaus.jackson.map.ObjectMapper;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
@@ -80,8 +85,9 @@ class ProtobufRpcEngine implements RpcEn
     return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
         metaHandlerCount, verbose, highPriorityLevel);
   }
-  private static class Invoker implements InvocationHandler {
-    private final Map<String, Message> returnTypes =
+
+  static class Invoker implements InvocationHandler {
+    private static final Map<String, Message> returnTypes =
         new ConcurrentHashMap<String, Message>();
     private Class<? extends VersionedProtocol> protocol;
     private InetSocketAddress address;
@@ -97,7 +103,7 @@ class ProtobufRpcEngine implements RpcEn
       this.protocol = protocol;
       this.address = addr;
       this.ticket = ticket;
-      this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
+      this.client = CLIENTS.getClient(conf, factory);
       this.rpcTimeout = rpcTimeout;
       Long version = Invocation.PROTOCOL_VERSION.get(protocol);
       if (version != null) {
@@ -133,6 +139,7 @@ class ProtobufRpcEngine implements RpcEn
             + method.getName() + "]" + ", Expected: 2, Actual: "
             + params.length);
       }
+      builder.setRequestClassName(param.getClass().getName());
       builder.setRequest(param.toByteString());
       builder.setClientProtocolVersion(clientProtocolVersion);
       rpcRequest = builder.build();
@@ -166,24 +173,20 @@ class ProtobufRpcEngine implements RpcEn
       }
 
       RpcRequestBody rpcRequest = constructRpcRequest(method, args);
-      RpcResponseWritable val = null;
+      Message val = null;
       try {
-        val = (RpcResponseWritable) client.call(
-            new RpcRequestWritable(rpcRequest), address, protocol, ticket,
-            rpcTimeout);
+        val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
 
         if (LOG.isDebugEnabled()) {
           long callTime = System.currentTimeMillis() - startTime;
-          LOG.debug("Call: " + method.getName() + " " + callTime);
+          if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
         }
-
-        Message protoType = null;
-        protoType = getReturnProtoType(method);
-        Message returnMessage;
-        returnMessage = protoType.newBuilderForType()
-            .mergeFrom(val.responseMessage).build();
-        return returnMessage;
+        return val;
       } catch (Throwable e) {
+        if (e instanceof RemoteException) {
+          Throwable cause = ((RemoteException)e).unwrapRemoteException();
+          throw new ServiceException(cause);
+        }
         throw new ServiceException(e);
       }
     }
@@ -195,7 +198,7 @@ class ProtobufRpcEngine implements RpcEn
       }
     }
 
-    private Message getReturnProtoType(Method method) throws Exception {
+   static Message getReturnProtoType(Method method) throws Exception {
       if (returnTypes.containsKey(method.getName())) {
         return returnTypes.get(method.getName());
       }
@@ -209,75 +212,7 @@ class ProtobufRpcEngine implements RpcEn
     }
   }
 
-  /**
-   * Writable Wrapper for Protocol Buffer Requests
-   */
-  private static class RpcRequestWritable implements Writable {
-    RpcRequestBody message;
-
-    @SuppressWarnings("unused")
-    public RpcRequestWritable() {
-    }
-
-    RpcRequestWritable(RpcRequestBody message) {
-      this.message = message;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      ((Message)message).writeDelimitedTo(
-          DataOutputOutputStream.constructOutputStream(out));
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      int length = ProtoUtil.readRawVarint32(in);
-      byte[] bytes = new byte[length];
-      in.readFully(bytes);
-      message = RpcRequestBody.parseFrom(bytes);
-    }
-    
-    public int getSerializedSize() {
-      return message.getSerializedSize();
-    }
-
-    @Override
-    public String toString() {
-      return " Client Protocol Version: " +
-          message.getClientProtocolVersion() + " MethodName: " +
-          message.getMethodName();
-    }
-  }
-
-  /**
-   * Writable Wrapper for Protocol Buffer Responses
-   */
-  private static class RpcResponseWritable implements Writable {
-    byte[] responseMessage;
-
-    @SuppressWarnings("unused")
-    public RpcResponseWritable() {
-    }
-
-    public RpcResponseWritable(Message message) {
-      this.responseMessage = message.toByteArray();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(responseMessage.length);
-      out.write(responseMessage);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
-      byte[] bytes = new byte[length];
-      in.readFully(bytes);
-      responseMessage = bytes;
-    }
-  }
-  public static class Server extends WritableRpcEngine.Server {
+  public static class Server extends HBaseServer {
     boolean verbose;
     Object instance;
     Class<?> implementation;
@@ -295,16 +230,27 @@ class ProtobufRpcEngine implements RpcEn
 
     private final int warnResponseTime;
     private final int warnResponseSize;
+
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+
     public Server(Object instance, final Class<?>[] ifaces,
         Configuration conf, String bindAddress,  int port,
         int numHandlers, int metaHandlerCount, boolean verbose,
         int highPriorityLevel)
         throws IOException {
-      super(instance, ifaces, RpcRequestWritable.class, conf, bindAddress, port,
-          numHandlers, metaHandlerCount, verbose, highPriorityLevel);
-      this.verbose = verbose;
+      super(bindAddress, port, numHandlers, metaHandlerCount,
+          conf, classNameBase(instance.getClass().getName()),
+          highPriorityLevel);
       this.instance = instance;
       this.implementation = instance.getClass();
+      this.verbose = verbose;
+
       // create metrics for the advertised interfaces this server implements.
       String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
       this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes);
@@ -313,23 +259,55 @@ class ProtobufRpcEngine implements RpcEn
           DEFAULT_WARN_RESPONSE_TIME);
       this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
           DEFAULT_WARN_RESPONSE_SIZE);
+      this.verbose = verbose;
+      this.instance = instance;
+      this.implementation = instance.getClass();
     }
-    private final Map<String, Message> methodArg =
+    private static final Map<String, Message> methodArg =
         new ConcurrentHashMap<String, Message>();
-    private final Map<String, Method> methodInstances =
+    private static final Map<String, Method> methodInstances =
         new ConcurrentHashMap<String, Method>();
+
+    private AuthenticationTokenSecretManager createSecretManager(){
+      if (!User.isSecurityEnabled() ||
+          !(instance instanceof org.apache.hadoop.hbase.Server)) {
+        return null;
+      }
+      org.apache.hadoop.hbase.Server server =
+          (org.apache.hadoop.hbase.Server)instance;
+      Configuration conf = server.getConfiguration();
+      long keyUpdateInterval =
+          conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
+      long maxAge =
+          conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
+      return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
+          server.getServerName().toString(), keyUpdateInterval, maxAge);
+    }
+
+    @Override
+    public void startThreads() {
+      AuthenticationTokenSecretManager mgr = createSecretManager();
+      if (mgr != null) {
+        setSecretManager(mgr);
+        mgr.start();
+      }
+      this.authManager = new ServiceAuthorizationManager();
+      HBasePolicyProvider.init(conf, authManager);
+
+      // continue with base startup
+      super.startThreads();
+    }
+
     @Override
     /**
      * This is a server side method, which is invoked over RPC. On success
      * the return response has protobuf response payload. On failure, the
      * exception name and the stack trace are returned in the protobuf response.
      */
-    public Writable call(Class<? extends VersionedProtocol> protocol,
-        Writable writableRequest, long receiveTime, MonitoredRPCHandler status)
+    public Message call(Class<? extends VersionedProtocol> protocol,
+        RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
         throws IOException {
       try {
-        RpcRequestWritable request = (RpcRequestWritable) writableRequest;
-        RpcRequestBody rpcRequest = request.message;
         String methodName = rpcRequest.getMethodName();
         Method method = getMethod(protocol, methodName);
         if (method == null) {
@@ -358,7 +336,7 @@ class ProtobufRpcEngine implements RpcEn
 
         status.setRPC(rpcRequest.getMethodName(), 
             new Object[]{rpcRequest.getRequest()}, receiveTime);
-        status.setRPCPacket(writableRequest);
+        status.setRPCPacket(rpcRequest);
         status.resume("Servicing call");        
         //get an instance of the method arg type
         Message protoType = getMethodArgType(method);
@@ -398,7 +376,7 @@ class ProtobufRpcEngine implements RpcEn
         rpcMetrics.rpcProcessingTime.inc(processingTime);
         rpcMetrics.inc(method.getName(), processingTime);
         if (verbose) {
-          WritableRpcEngine.log("Return: "+result, LOG);
+          log("Return: "+result, LOG);
         }
         long responseSize = result.getSerializedSize();
         // log any RPC responses that are slower than the configured warn
@@ -432,7 +410,7 @@ class ProtobufRpcEngine implements RpcEn
           rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC,
               processingTime);
         }
-        return new RpcResponseWritable(result);
+        return result;
       } catch (InvocationTargetException e) {
         Throwable target = e.getTargetException();
         if (target instanceof IOException) {
@@ -454,7 +432,7 @@ class ProtobufRpcEngine implements RpcEn
       }
     }
 
-    private Method getMethod(Class<? extends VersionedProtocol> protocol,
+    static Method getMethod(Class<? extends VersionedProtocol> protocol,
         String methodName) {
       Method method = methodInstances.get(methodName);
       if (method != null) {
@@ -472,7 +450,7 @@ class ProtobufRpcEngine implements RpcEn
       return null;
     }
 
-    private Message getMethodArgType(Method method) throws Exception {
+    static Message getMethodArgType(Method method) throws Exception {
       Message protoType = methodArg.get(method.getName());
       if (protoType != null) {
         return protoType;
@@ -497,5 +475,68 @@ class ProtobufRpcEngine implements RpcEn
       methodArg.put(method.getName(), protoType);
       return protoType;
     }
+    /**
+     * Logs an RPC response to the LOG file, producing valid JSON objects for
+     * client Operations.
+     * @param params The parameters received in the call.
+     * @param methodName The name of the method invoked
+     * @param call The string representation of the call
+     * @param tag  The tag that will be used to indicate this event in the log.
+     * @param client          The address of the client who made this call.
+     * @param startTime       The time that the call was initiated, in ms.
+     * @param processingTime  The duration that the call took to run, in ms.
+     * @param qTime           The duration that the call spent on the queue
+     *                        prior to being initiated, in ms.
+     * @param responseSize    The size in bytes of the response buffer.
+     */
+     void logResponse(Object[] params, String methodName, String call, String tag,
+         String clientAddress, long startTime, int processingTime, int qTime,
+         long responseSize)
+      throws IOException {
+      // for JSON encoding
+      ObjectMapper mapper = new ObjectMapper();
+      // base information that is reported regardless of type of call
+      Map<String, Object> responseInfo = new HashMap<String, Object>();
+      responseInfo.put("starttimems", startTime);
+      responseInfo.put("processingtimems", processingTime);
+      responseInfo.put("queuetimems", qTime);
+      responseInfo.put("responsesize", responseSize);
+      responseInfo.put("client", clientAddress);
+      responseInfo.put("class", instance.getClass().getSimpleName());
+      responseInfo.put("method", methodName);
+      if (params.length == 2 && instance instanceof HRegionServer &&
+          params[0] instanceof byte[] &&
+          params[1] instanceof Operation) {
+        // if the slow process is a query, we want to log its table as well
+        // as its own fingerprint
+        byte [] tableName =
+          HRegionInfo.parseRegionName((byte[]) params[0])[0];
+        responseInfo.put("table", Bytes.toStringBinary(tableName));
+        // annotate the response map with operation details
+        responseInfo.putAll(((Operation) params[1]).toMap());
+        // report to the log file
+        LOG.warn("(operation" + tag + "): " +
+            mapper.writeValueAsString(responseInfo));
+      } else if (params.length == 1 && instance instanceof HRegionServer &&
+          params[0] instanceof Operation) {
+        // annotate the response map with operation details
+        responseInfo.putAll(((Operation) params[0]).toMap());
+        // report to the log file
+        LOG.warn("(operation" + tag + "): " +
+            mapper.writeValueAsString(responseInfo));
+      } else {
+        // can't get JSON details, so just report call.toString() along with
+        // a more generic tag.
+        responseInfo.put("call", call);
+        LOG.warn("(response" + tag + "): " +
+            mapper.writeValueAsString(responseInfo));
+      }
+    }
+    protected static void log(String value, Log LOG) {
+      String v = value;
+      if (v != null && v.length() > 55)
+        v = v.substring(0, 55)+"...";
+      LOG.info(v);
+    }
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Sun Aug 19 21:47:21 2012
@@ -21,10 +21,12 @@
 package org.apache.hadoop.hbase.ipc;
 
 import com.google.common.base.Function;
-import org.apache.hadoop.io.Writable;
+import com.google.protobuf.Message;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -47,16 +49,16 @@ public interface RpcServer {
   /** Called for each call.
    * @param param writable parameter
    * @param receiveTime time
-   * @return Writable
+   * @return Message
    * @throws java.io.IOException e
    */
-  Writable call(Class<? extends VersionedProtocol> protocol,
-      Writable param, long receiveTime, MonitoredRPCHandler status)
+  Message call(Class<? extends VersionedProtocol> protocol,
+      RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
       throws IOException;
 
   void setErrorHandler(HBaseRPCErrorHandler handler);
 
-  void setQosFunction(Function<Writable, Integer> newFunc);
+  void setQosFunction(Function<RpcRequestBody, Integer> newFunc);
 
   void openServer();
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Sun Aug 19 21:47:21 2012
@@ -1,468 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.UndeclaredThrowableException;
-
-import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Operation;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Objects;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.ipc.VersionedProtocol;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.*;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.protobuf.ServiceException;
-
-/** An RpcEngine implementation for Writable data. */
-@InterfaceAudience.Private
-class WritableRpcEngine implements RpcEngine {
-  // LOG is NOT in hbase subpackage intentionally so that the default HBase
-  // DEBUG log level does NOT emit RPC-level logging.
-  private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
-
-  protected final static ClientCache CLIENTS = new ClientCache();
-
-  private static class Invoker implements InvocationHandler {
-    private Class<? extends VersionedProtocol> protocol;
-    private InetSocketAddress address;
-    private User ticket;
-    private HBaseClient client;
-    private boolean isClosed = false;
-    final private int rpcTimeout;
-
-    public Invoker(Class<? extends VersionedProtocol> protocol,
-                   InetSocketAddress address, User ticket,
-                   Configuration conf, SocketFactory factory, int rpcTimeout) {
-      this.protocol = protocol;
-      this.address = address;
-      this.ticket = ticket;
-      this.client = CLIENTS.getClient(conf, factory);
-      this.rpcTimeout = rpcTimeout;
-    }
-
-    public Object invoke(Object proxy, Method method, Object[] args)
-        throws Throwable {
-      final boolean logDebug = LOG.isDebugEnabled();
-      long startTime = 0;
-      if (logDebug) {
-        startTime = System.currentTimeMillis();
-      }
-
-      try {
-        HbaseObjectWritable value = (HbaseObjectWritable)
-          client.call(new Invocation(method, args), address, protocol, ticket, 
-                      rpcTimeout);
-        if (logDebug) {
-          // FIGURE HOW TO TURN THIS OFF!
-          long callTime = System.currentTimeMillis() - startTime;
-          LOG.debug("Call: " + method.getName() + " " + callTime);
-        }
-        return value.get();
-      } catch (Throwable t) {
-        // For protobuf protocols, ServiceException is expected
-        if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
-          if (t instanceof RemoteException) {
-            Throwable cause = ((RemoteException)t).unwrapRemoteException();
-            throw new ServiceException(cause);
-          }
-          throw new ServiceException(t);
-        }
-        throw t;
-      }
-    }
-
-    /* close the IPC client that's responsible for this invoker's RPCs */
-    synchronized protected void close() {
-      if (!isClosed) {
-        isClosed = true;
-        CLIENTS.stopClient(client);
-      }
-    }
-  }
-
-  /** Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address. */
-  public VersionedProtocol getProxy(
-      Class<? extends VersionedProtocol> protocol, long clientVersion,
-      InetSocketAddress addr, User ticket,
-      Configuration conf, SocketFactory factory, int rpcTimeout)
-    throws IOException {
-
-      VersionedProtocol proxy =
-          (VersionedProtocol) Proxy.newProxyInstance(
-              protocol.getClassLoader(), new Class[] { protocol },
-              new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
-      try {
-        long serverVersion = ((VersionedProtocol)proxy)
-          .getProtocolVersion(protocol.getName(), clientVersion);
-        if (serverVersion != clientVersion) {
-          throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
-                                        serverVersion);
-        }
-      } catch (Throwable t) {
-        if (t instanceof UndeclaredThrowableException) {
-          t = t.getCause();
-        }
-        if (t instanceof ServiceException) {
-          throw ProtobufUtil.getRemoteException((ServiceException)t);
-        }
-        if (!(t instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", t);
-          throw new IOException(t);
-        }
-        throw (IOException)t;
-      }
-    return proxy;
-  }
-
-  /**
-   * Stop this proxy and release its invoker's resource
-   * @param proxy the proxy to be stopped
-   */
-  public void stopProxy(VersionedProtocol proxy) {
-    if (proxy!=null) {
-      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
-    }
-  }
-
-  /** Construct a server for a protocol implementation instance listening on a
-   * port and address. */
-  public Server getServer(Class<? extends VersionedProtocol> protocol,
-                          Object instance,
-                          Class<?>[] ifaces,
-                          String bindAddress, int port,
-                          int numHandlers,
-                          int metaHandlerCount, boolean verbose,
-                          Configuration conf, int highPriorityLevel)
-    throws IOException {
-    return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
-        metaHandlerCount, verbose, highPriorityLevel);
-  }
-
-  /** An RPC Server. */
-  public static class Server extends HBaseServer {
-    private Object instance;
-    private Class<?> implementation;
-    private Class<?>[] ifaces;
-    private boolean verbose;
-
-    private static final String WARN_RESPONSE_TIME =
-      "hbase.ipc.warn.response.time";
-    private static final String WARN_RESPONSE_SIZE =
-      "hbase.ipc.warn.response.size";
-
-    /** Default value for above params */
-    private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
-    private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
-
-    /** Names for suffixed metrics */
-    private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
-
-    private final int warnResponseTime;
-    private final int warnResponseSize;
-
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param ifaces the interfaces the server supports
-     * @param paramClass an instance of this class is used to read the RPC requests
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param metaHandlerCount the number of meta handlers desired
-     * @param verbose whether each call should be logged
-     * @param highPriorityLevel the priority level this server treats as high priority RPCs
-     * @throws IOException e
-     */
-    public Server(Object instance, final Class<?>[] ifaces,
-                  Class<? extends Writable> paramClass,
-                  Configuration conf, String bindAddress,  int port,
-                  int numHandlers, int metaHandlerCount, boolean verbose,
-                  int highPriorityLevel) throws IOException {
-      super(bindAddress, port, paramClass, numHandlers, metaHandlerCount,
-          conf, classNameBase(instance.getClass().getName()),
-          highPriorityLevel);
-      this.instance = instance;
-      this.implementation = instance.getClass();
-      this.verbose = verbose;
-
-      this.ifaces = ifaces;
-
-      // create metrics for the advertised interfaces this server implements.
-      String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
-      this.rpcMetrics.createMetrics(this.ifaces, false, metricSuffixes);
-
-      this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
-          DEFAULT_WARN_RESPONSE_TIME);
-      this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
-          DEFAULT_WARN_RESPONSE_SIZE);
-    }
-
-    public Server(Object instance, final Class<?>[] ifaces,
-        Configuration conf, String bindAddress,  int port,
-        int numHandlers, int metaHandlerCount, boolean verbose,
-        int highPriorityLevel) throws IOException {
-      this(instance, ifaces, Invocation.class, conf, bindAddress, port, 
-          numHandlers, metaHandlerCount, verbose, highPriorityLevel);
-    }
-
-    public AuthenticationTokenSecretManager createSecretManager(){
-      if (!User.isSecurityEnabled() ||
-          !(instance instanceof org.apache.hadoop.hbase.Server)) {
-        return null;
-      }
-      org.apache.hadoop.hbase.Server server =
-          (org.apache.hadoop.hbase.Server)instance;
-      Configuration conf = server.getConfiguration();
-      long keyUpdateInterval =
-          conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
-      long maxAge =
-          conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
-      return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
-          server.getServerName().toString(), keyUpdateInterval, maxAge);
-    }
-
-    @Override
-    public void startThreads() {
-      AuthenticationTokenSecretManager mgr = createSecretManager();
-      if (mgr != null) {
-        setSecretManager(mgr);
-        mgr.start();
-      }
-      this.authManager = new ServiceAuthorizationManager();
-      HBasePolicyProvider.init(conf, authManager);
-
-      // continue with base startup
-      super.startThreads();
-    }
-
-    @Override
-    public Writable call(Class<? extends VersionedProtocol> protocol,
-        Writable param, long receivedTime, MonitoredRPCHandler status)
-    throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if(call.getMethodName() == null) {
-          throw new IOException("Could not find requested method, the usual " +
-              "cause is a version mismatch between client and server.");
-        }
-        if (verbose) log("Call: " + call, LOG);
-        status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
-        status.setRPCPacket(param);
-        status.resume("Servicing call");
-
-        Method method =
-          protocol.getMethod(call.getMethodName(),
-                                   call.getParameterClasses());
-        method.setAccessible(true);
-
-        //Verify protocol version.
-        //Bypass the version check for VersionedProtocol
-        if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
-          long clientVersion = call.getProtocolVersion();
-          ProtocolSignature serverInfo = ((VersionedProtocol) instance)
-              .getProtocolSignature(protocol.getCanonicalName(), call
-                  .getProtocolVersion(), call.getClientMethodsHash());
-          long serverVersion = serverInfo.getVersion();
-          if (serverVersion != clientVersion) {
-            LOG.warn("Version mismatch: client version=" + clientVersion
-                + ", server version=" + serverVersion);
-            throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
-                serverVersion);
-          }
-        }
-        Object impl = null;
-        if (protocol.isAssignableFrom(this.implementation)) {
-          impl = this.instance;
-        }
-        else {
-          throw new HBaseRPC.UnknownProtocolException(protocol);
-        }
-
-        long startTime = System.currentTimeMillis();
-        Object[] params = call.getParameters();
-        Object value = method.invoke(impl, params);
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (TRACELOG.isDebugEnabled()) {
-          TRACELOG.debug("Call #" + CurCall.get().id +
-              "; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() +
-              " queueTime=" + qTime +
-              " processingTime=" + processingTime +
-              " contents=" + Objects.describeQuantity(params));
-        }
-        rpcMetrics.rpcQueueTime.inc(qTime);
-        rpcMetrics.rpcProcessingTime.inc(processingTime);
-        rpcMetrics.inc(call.getMethodName(), processingTime);
-        if (verbose) log("Return: "+value, LOG);
-
-        HbaseObjectWritable retVal =
-          new HbaseObjectWritable(method.getReturnType(), value);
-        long responseSize = retVal.getWritableSize();
-        // log any RPC responses that are slower than the configured warn
-        // response time or larger than configured warning size
-        boolean tooSlow = (processingTime > warnResponseTime
-            && warnResponseTime > -1);
-        boolean tooLarge = (responseSize > warnResponseSize
-            && warnResponseSize > -1);
-        if (tooSlow || tooLarge) {
-          // when tagging, we let TooLarge trump TooSmall to keep output simple
-          // note that large responses will often also be slow.
-          logResponse(call.getParameters(), call.getMethodName(), 
-              call.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
-              status.getClient(), startTime, processingTime, qTime,
-              responseSize);
-          // provides a count of log-reported slow responses
-          if (tooSlow) {
-            rpcMetrics.rpcSlowResponseTime.inc(processingTime);
-          }
-        }
-        if (processingTime > 1000) {
-          // we use a hard-coded one second period so that we can clearly
-          // indicate the time period we're warning about in the name of the 
-          // metric itself
-          rpcMetrics.inc(call.getMethodName() + ABOVE_ONE_SEC_METRIC,
-              processingTime);
-        }
-
-        return retVal;
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        }
-        if (target instanceof ServiceException) {
-          throw ProtobufUtil.getRemoteException((ServiceException)target);
-        }
-        IOException ioe = new IOException(target.toString());
-        ioe.setStackTrace(target.getStackTrace());
-        throw ioe;
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
-      }
-    }
-
-    /**
-     * Logs an RPC response to the LOG file, producing valid JSON objects for
-     * client Operations.
-     * @param params The parameters received in the call.
-     * @param methodName The name of the method invoked
-     * @param call The string representation of the call
-     * @param tag  The tag that will be used to indicate this event in the log.
-     * @param client          The address of the client who made this call.
-     * @param startTime       The time that the call was initiated, in ms.
-     * @param processingTime  The duration that the call took to run, in ms.
-     * @param qTime           The duration that the call spent on the queue 
-     *                        prior to being initiated, in ms.
-     * @param responseSize    The size in bytes of the response buffer.
-     */
-     void logResponse(Object[] params, String methodName, String call, String tag, 
-         String clientAddress, long startTime, int processingTime, int qTime, 
-         long responseSize)
-      throws IOException {
-      // for JSON encoding
-      ObjectMapper mapper = new ObjectMapper();
-      // base information that is reported regardless of type of call
-      Map<String, Object> responseInfo = new HashMap<String, Object>();
-      responseInfo.put("starttimems", startTime);
-      responseInfo.put("processingtimems", processingTime);
-      responseInfo.put("queuetimems", qTime);
-      responseInfo.put("responsesize", responseSize);
-      responseInfo.put("client", clientAddress);
-      responseInfo.put("class", instance.getClass().getSimpleName());
-      responseInfo.put("method", methodName);
-      if (params.length == 2 && instance instanceof HRegionServer &&
-          params[0] instanceof byte[] &&
-          params[1] instanceof Operation) {
-        // if the slow process is a query, we want to log its table as well 
-        // as its own fingerprint
-        byte [] tableName =
-          HRegionInfo.parseRegionName((byte[]) params[0])[0];
-        responseInfo.put("table", Bytes.toStringBinary(tableName));
-        // annotate the response map with operation details
-        responseInfo.putAll(((Operation) params[1]).toMap());
-        // report to the log file
-        LOG.warn("(operation" + tag + "): " +
-            mapper.writeValueAsString(responseInfo));
-      } else if (params.length == 1 && instance instanceof HRegionServer &&
-          params[0] instanceof Operation) {
-        // annotate the response map with operation details
-        responseInfo.putAll(((Operation) params[0]).toMap());
-        // report to the log file
-        LOG.warn("(operation" + tag + "): " +
-            mapper.writeValueAsString(responseInfo));
-      } else {
-        // can't get JSON details, so just report call.toString() along with 
-        // a more generic tag.
-        responseInfo.put("call", call);
-        LOG.warn("(response" + tag + "): " +
-            mapper.writeValueAsString(responseInfo));
-      }
-    }
-  }
-
-  protected static void log(String value, Log LOG) {
-    String v = value;
-    if (v != null && v.length() > 55)
-      v = v.substring(0, 55)+"...";
-    LOG.info(v);
-  }
-}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java Sun Aug 19 21:47:21 2012
@@ -20,8 +20,7 @@
 package org.apache.hadoop.hbase.monitoring;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 
 /**
  * A MonitoredTask implementation optimized for use with RPC Handlers 
@@ -38,9 +37,9 @@ public interface MonitoredRPCHandler ext
   public abstract long getRPCQueueTime();
   public abstract boolean isRPCRunning();
   public abstract boolean isOperationRunning();
-  
+
   public abstract void setRPC(String methodName, Object [] params,
       long queueTime);
-  public abstract void setRPCPacket(Writable param);
+  public abstract void setRPCPacket(RpcRequestBody param);
   public abstract void setConnection(String clientAddress, int remotePort);
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Sun Aug 19 21:47:21 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.monitori
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 
@@ -46,7 +47,7 @@ public class MonitoredRPCHandlerImpl ext
   private long rpcStartTime;
   private String methodName = "";
   private Object [] params = {};
-  private Writable packet;
+  private RpcRequestBody packet;
 
   public MonitoredRPCHandlerImpl() {
     super();
@@ -141,11 +142,7 @@ public class MonitoredRPCHandlerImpl ext
       // no RPC is currently running, or we don't have an RPC's packet info
       return -1L;
     }
-    if (!(packet instanceof WritableWithSize)) {
-      // the packet passed to us doesn't expose size information
-      return -1L;
-    }
-    return ((WritableWithSize) packet).getWritableSize();
+    return packet.getSerializedSize();
   }
 
   /**
@@ -201,11 +198,11 @@ public class MonitoredRPCHandlerImpl ext
   }
 
   /**
-   * Gives this instance a reference to the Writable received by the RPC, so 
+   * Gives this instance a reference to the protobuf received by the RPC, so 
    * that it can later compute its size if asked for it.
-   * @param param The Writable received by the RPC for this call
+   * @param param The protobuf received by the RPC for this call
    */
-  public void setRPCPacket(Writable param) {
+  public void setRPCPacket(RpcRequestBody param) {
     this.packet = param;
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java Sun Aug 19 21:47:21 2012
@@ -1492,6 +1492,10 @@ public final class RPCProtos {
     // optional bytes request = 3;
     boolean hasRequest();
     com.google.protobuf.ByteString getRequest();
+    
+    // optional string requestClassName = 4;
+    boolean hasRequestClassName();
+    String getRequestClassName();
   }
   public static final class RpcRequestBody extends
       com.google.protobuf.GeneratedMessage
@@ -1574,10 +1578,43 @@ public final class RPCProtos {
       return request_;
     }
     
+    // optional string requestClassName = 4;
+    public static final int REQUESTCLASSNAME_FIELD_NUMBER = 4;
+    private java.lang.Object requestClassName_;
+    public boolean hasRequestClassName() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public String getRequestClassName() {
+      java.lang.Object ref = requestClassName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          requestClassName_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getRequestClassNameBytes() {
+      java.lang.Object ref = requestClassName_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        requestClassName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       methodName_ = "";
       clientProtocolVersion_ = 0L;
       request_ = com.google.protobuf.ByteString.EMPTY;
+      requestClassName_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1604,6 +1641,9 @@ public final class RPCProtos {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, request_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, getRequestClassNameBytes());
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -1625,6 +1665,10 @@ public final class RPCProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, request_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getRequestClassNameBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1663,6 +1707,11 @@ public final class RPCProtos {
         result = result && getRequest()
             .equals(other.getRequest());
       }
+      result = result && (hasRequestClassName() == other.hasRequestClassName());
+      if (hasRequestClassName()) {
+        result = result && getRequestClassName()
+            .equals(other.getRequestClassName());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1684,6 +1733,10 @@ public final class RPCProtos {
         hash = (37 * hash) + REQUEST_FIELD_NUMBER;
         hash = (53 * hash) + getRequest().hashCode();
       }
+      if (hasRequestClassName()) {
+        hash = (37 * hash) + REQUESTCLASSNAME_FIELD_NUMBER;
+        hash = (53 * hash) + getRequestClassName().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -1806,6 +1859,8 @@ public final class RPCProtos {
         bitField0_ = (bitField0_ & ~0x00000002);
         request_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000004);
+        requestClassName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -1856,6 +1911,10 @@ public final class RPCProtos {
           to_bitField0_ |= 0x00000004;
         }
         result.request_ = request_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.requestClassName_ = requestClassName_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1881,6 +1940,9 @@ public final class RPCProtos {
         if (other.hasRequest()) {
           setRequest(other.getRequest());
         }
+        if (other.hasRequestClassName()) {
+          setRequestClassName(other.getRequestClassName());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1931,6 +1993,11 @@ public final class RPCProtos {
               request_ = input.readBytes();
               break;
             }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              requestClassName_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -2018,6 +2085,42 @@ public final class RPCProtos {
         return this;
       }
       
+      // optional string requestClassName = 4;
+      private java.lang.Object requestClassName_ = "";
+      public boolean hasRequestClassName() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public String getRequestClassName() {
+        java.lang.Object ref = requestClassName_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          requestClassName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setRequestClassName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        requestClassName_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearRequestClassName() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        requestClassName_ = getDefaultInstance().getRequestClassName();
+        onChanged();
+        return this;
+      }
+      void setRequestClassName(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000008;
+        requestClassName_ = value;
+        onChanged();
+      }
+      
       // @@protoc_insertion_point(builder_scope:RpcRequestBody)
     }
     
@@ -2032,7 +2135,7 @@ public final class RPCProtos {
   public interface RpcResponseHeaderOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
     
-    // required int32 callId = 1;
+    // required uint32 callId = 1;
     boolean hasCallId();
     int getCallId();
     
@@ -2141,7 +2244,7 @@ public final class RPCProtos {
     }
     
     private int bitField0_;
-    // required int32 callId = 1;
+    // required uint32 callId = 1;
     public static final int CALLID_FIELD_NUMBER = 1;
     private int callId_;
     public boolean hasCallId() {
@@ -2186,7 +2289,7 @@ public final class RPCProtos {
                         throws java.io.IOException {
       getSerializedSize();
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        output.writeInt32(1, callId_);
+        output.writeUInt32(1, callId_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeEnum(2, status_.getNumber());
@@ -2202,7 +2305,7 @@ public final class RPCProtos {
       size = 0;
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeInt32Size(1, callId_);
+          .computeUInt32Size(1, callId_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         size += com.google.protobuf.CodedOutputStream
@@ -2487,7 +2590,7 @@ public final class RPCProtos {
             }
             case 8: {
               bitField0_ |= 0x00000001;
-              callId_ = input.readInt32();
+              callId_ = input.readUInt32();
               break;
             }
             case 16: {
@@ -2507,7 +2610,7 @@ public final class RPCProtos {
       
       private int bitField0_;
       
-      // required int32 callId = 1;
+      // required uint32 callId = 1;
       private int callId_ ;
       public boolean hasCallId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
@@ -3505,16 +3608,17 @@ public final class RPCProtos {
       "ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" +
       "rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" +
       "doop.hbase.client.ClientProtocol\"\"\n\020RpcR" +
-      "equestHeader\022\016\n\006callId\030\001 \002(\r\"T\n\016RpcReque" +
+      "equestHeader\022\016\n\006callId\030\001 \002(\r\"n\n\016RpcReque" +
       "stBody\022\022\n\nmethodName\030\001 \002(\t\022\035\n\025clientProt" +
-      "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\"{\n\021Rp" +
-      "cResponseHeader\022\016\n\006callId\030\001 \002(\005\022)\n\006statu" +
-      "s\030\002 \002(\0162\031.RpcResponseHeader.Status\"+\n\006St",
-      "atus\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"" +
-      "#\n\017RpcResponseBody\022\020\n\010response\030\001 \001(\014\"9\n\014" +
-      "RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022\n\ns" +
-      "tackTrace\030\002 \001(\tB<\n*org.apache.hadoop.hba" +
-      "se.protobuf.generatedB\tRPCProtosH\001\240\001\001"
+      "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020re" +
+      "questClassName\030\004 \001(\t\"{\n\021RpcResponseHeade" +
+      "r\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 \002(\0162\031.RpcR",
+      "esponseHeader.Status\"+\n\006Status\022\013\n\007SUCCES" +
+      "S\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017RpcResponse" +
+      "Body\022\020\n\010response\030\001 \001(\014\"9\n\014RpcException\022\025" +
+      "\n\rexceptionName\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001(" +
+      "\tB<\n*org.apache.hadoop.hbase.protobuf.ge" +
+      "neratedB\tRPCProtosH\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3550,7 +3654,7 @@ public final class RPCProtos {
           internal_static_RpcRequestBody_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_RpcRequestBody_descriptor,
-              new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", },
+              new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", "RequestClassName", },
               org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.class,
               org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.Builder.class);
           internal_static_RpcResponseHeader_descriptor =