You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/07/30 09:07:18 UTC

svn commit: r1367009 [1/4] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/monitoring/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/jav...

Author: stack
Date: Mon Jul 30 07:07:17 2012
New Revision: 1367009

URL: http://svn.apache.org/viewvc?rev=1367009&view=rev
Log:
HBASE-5705 Introduce Protocol Buffer RPC engine

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestRpcServiceProtos.java
    hbase/trunk/hbase-server/src/test/protobuf/
    hbase/trunk/hbase-server/src/test/protobuf/test.proto
    hbase/trunk/hbase-server/src/test/protobuf/test_rpc_service.proto
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/protobuf/RPC.proto

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java?rev=1367009&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java Mon Jul 30 07:07:17 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Cache a client using its socket factory as the hash key.
+ * Enables reuse/sharing of clients on a per SocketFactory basis. A client 
+ * establishes certain configuration dependent characteristics like timeouts, 
+ * tcp-keepalive (true or false), etc. For more details on the characteristics,
+ * look at {@link HBaseClient#HBaseClient(Class, Configuration, SocketFactory)}
+ * Creation of dynamic proxies to protocols creates the clients (and increments
+ * reference count once created), and stopping of the proxies leads to clearing
+ * out references and when the reference drops to zero, the cache mapping is 
+ * cleared. 
+ */
+class ClientCache {
+  private Map<SocketFactory, HBaseClient> clients =
+    new HashMap<SocketFactory, HBaseClient>();
+
+  protected ClientCache() {}
+
+  /**
+   * Construct & cache an IPC client with the user-provided SocketFactory
+   * if no cached client exists.
+   *
+   * @param conf Configuration
+   * @param factory socket factory
+   * @return an IPC client
+   */
+  protected synchronized HBaseClient getClient(Configuration conf,
+      SocketFactory factory) {
+    return getClient(conf, factory, HbaseObjectWritable.class);
+  }
+  protected synchronized HBaseClient getClient(Configuration conf,
+      SocketFactory factory, Class<? extends Writable> valueClass) {
+    HBaseClient client = clients.get(factory);
+    if (client == null) {
+      // Make an hbase client instead of hadoop Client.
+      client = new HBaseClient(valueClass, conf, factory);
+      clients.put(factory, client);
+    } else {
+      client.incCount();
+    }
+    return client;
+  }
+
+  /**
+   * Stop a RPC client connection
+   * A RPC client is closed only when its reference count becomes zero.
+   * @param client client to stop
+   */
+  protected void stopClient(HBaseClient client) {
+    synchronized (this) {
+      client.decCount();
+      if (client.isZeroReference()) {
+        clients.remove(client.getSocketFactory());
+      }
+    }
+    if (client.isZeroReference()) {
+      client.stop();
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Mon Jul 30 07:07:17 2012
@@ -53,12 +53,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
@@ -82,7 +82,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.protobuf.ByteString;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -826,17 +825,15 @@ public class HBaseClient {
       try {
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " sending #" + call.id);
-        RpcRequest.Builder builder = RPCProtos.RpcRequest.newBuilder();
+        RpcRequestHeader.Builder builder = RPCProtos.RpcRequestHeader.newBuilder();
         builder.setCallId(call.id);
-        Invocation invocation = (Invocation)call.param;
         DataOutputBuffer d = new DataOutputBuffer();
-        invocation.write(d);
-        builder.setRequest(ByteString.copyFrom(d.getData()));
+        builder.build().writeDelimitedTo(d);
+        call.param.write(d);
         //noinspection SynchronizeOnNonFinalField
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
-          RpcRequest obj = builder.build();
-          this.out.writeInt(obj.getSerializedSize());
-          obj.writeTo(DataOutputOutputStream.constructOutputStream(this.out));
+          this.out.writeInt(d.getLength());
+          this.out.write(d.getData(), 0, d.getLength());
           this.out.flush();
         }
       } catch(IOException e) {
@@ -859,7 +856,7 @@ public class HBaseClient {
         // so the exception name/trace), and the response bytes
 
         // Read the call id.
-        RpcResponse response = RpcResponse.parseDelimitedFrom(in);
+        RpcResponseHeader response = RpcResponseHeader.parseDelimitedFrom(in);
         if (response == null) {
           // When the stream is closed, protobuf doesn't raise an EOFException,
           // instead, it returns a null message object. 
@@ -873,11 +870,8 @@ public class HBaseClient {
 
         Status status = response.getStatus();
         if (status == Status.SUCCESS) {
-          ByteString responseObj = response.getResponse();
-          DataInputStream dis =
-              new DataInputStream(responseObj.newInput());
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
-          value.readFields(dis);                 // read value
+          value.readFields(in);                 // read value
           // it's possible that this call may have been cleaned up due to a RPC
           // timeout, so check if it still exists before setting the value.
           if (call != null) {
@@ -885,18 +879,20 @@ public class HBaseClient {
           }
           calls.remove(id);
         } else if (status == Status.ERROR) {
+          RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
           if (call != null) {
             //noinspection ThrowableInstanceNeverThrown
             call.setException(new RemoteException(
-                response.getException().getExceptionName(),
-                response.getException().getStackTrace()));
+                exceptionResponse.getExceptionName(),
+                exceptionResponse.getStackTrace()));
             calls.remove(id);
           }
         } else if (status == Status.FATAL) {
+          RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
           // Close the connection
           markClosed(new RemoteException(
-              response.getException().getExceptionName(),
-              response.getException().getStackTrace()));
+              exceptionResponse.getExceptionName(),
+              exceptionResponse.getStackTrace()));
         }
       } catch (IOException e) {
         if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Mon Jul 30 07:07:17 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.util.Reflection
 
 import javax.net.SocketFactory;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -101,6 +102,13 @@ public class HBaseRPC {
       }
     };
 
+  static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field versionField = protocol.getField("VERSION");
+    versionField.setAccessible(true);
+    return versionField.getLong(protocol);
+  }
+
   // set a protocol to use a non-default RpcEngine
   static void setProtocolEngine(Configuration conf,
                                 Class protocol, Class engine) {
@@ -333,16 +341,21 @@ public class HBaseRPC {
       long clientVersion, InetSocketAddress addr, User ticket,
       Configuration conf, SocketFactory factory, int rpcTimeout)
   throws IOException {
-    VersionedProtocol proxy =
-        getProtocolEngine(protocol,conf)
-            .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
-    long serverVersion = proxy.getProtocolVersion(protocol.getName(),
-                                                  clientVersion);
-    if (serverVersion == clientVersion) {
-      return proxy;
-    }
-    throw new VersionMismatch(protocol.getName(), clientVersion,
+    RpcEngine engine = getProtocolEngine(protocol,conf);
+    VersionedProtocol proxy = engine
+            .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+                Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
+    if (engine instanceof WritableRpcEngine) {
+      long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+          clientVersion);
+      if (serverVersion == clientVersion) {
+        return proxy;
+      }
+
+      throw new VersionMismatch(protocol.getName(), clientVersion,
                               serverVersion);
+    }
+    return proxy;
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Mon Jul 30 07:07:17 2012
@@ -73,11 +73,11 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.security.User;
@@ -381,23 +381,21 @@ public abstract class HBaseServer implem
       }
 
       ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
+      DataOutputStream out = new DataOutputStream(buf);
       try {
-        RpcResponse.Builder builder = RpcResponse.newBuilder();
+        RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
         // Call id.
         builder.setCallId(this.id);
         builder.setStatus(status);
+        builder.build().writeDelimitedTo(out);
         if (error != null) {
           RpcException.Builder b = RpcException.newBuilder();
           b.setExceptionName(errorClass);
           b.setStackTrace(error);
-          builder.setException(b.build());
+          b.build().writeDelimitedTo(out);
         } else {
-          DataOutputBuffer d = new DataOutputBuffer(size);
-          result.write(d);
-          byte[] response = d.getData();
-          builder.setResponse(ByteString.copyFrom(response));
+          result.write(out);
         }
-        builder.build().writeDelimitedTo(buf);
         if (connection.useWrap) {
           wrapWithSasl(buf);
         }
@@ -1616,9 +1614,10 @@ public abstract class HBaseServer implem
     }
 
     protected void processData(byte[] buf) throws  IOException, InterruptedException {
-      RpcRequest request = RpcRequest.parseFrom(buf);
+      DataInputStream dis =
+        new DataInputStream(new ByteArrayInputStream(buf));
+      RpcRequestHeader request = RpcRequestHeader.parseDelimitedFrom(dis);
       int id = request.getCallId();
-      ByteString clientRequest = request.getRequest();
       long callSize = buf.length;
 
       if (LOG.isDebugEnabled()) {
@@ -1639,8 +1638,6 @@ public abstract class HBaseServer implem
 
       Writable param;
       try {
-        DataInputStream dis =
-            new DataInputStream(clientRequest.newInput());
         param = ReflectionUtils.newInstance(paramClass, conf);//read param
         param.readFields(dis);
       } catch (Throwable t) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Mon Jul 30 07:07:17 2012
@@ -57,7 +57,7 @@ public class Invocation extends Versione
 
   // For generated protocol classes which don't have VERSION field,
   // such as protobuf interfaces.
-  private static final Map<Class<?>, Long>
+  static final Map<Class<?>, Long>
     PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
 
   static {

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java?rev=1367009&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java Mon Jul 30 07:07:17 2012
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.DataOutputOutputStream;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.hbase.util.Objects;
+import org.apache.hadoop.hbase.util.ProtoUtil;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+/**
+ * The {@link RpcEngine} implementation for ProtoBuf-based RPCs.
+ */
+@InterfaceAudience.Private
+class ProtobufRpcEngine implements RpcEngine {
+  private static final Log LOG =
+      LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcEngine");
+  protected final static ClientCache CLIENTS = new ClientCache();
+  @Override
+  public VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, User ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout) throws IOException {
+    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
+        rpcTimeout);
+    return (VersionedProtocol)Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[]{protocol}, invoker);
+  }
+
+  @Override
+  public void stopProxy(VersionedProtocol proxy) {
+    if (proxy!=null) {
+      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+    }
+  }
+
+  @Override
+  public Server getServer(Class<? extends VersionedProtocol> protocol,
+      Object instance, Class<?>[] ifaces, String bindAddress, int port,
+      int numHandlers, int metaHandlerCount, boolean verbose,
+      Configuration conf, int highPriorityLevel) throws IOException {
+    return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
+        metaHandlerCount, verbose, highPriorityLevel);
+  }
+  private static class Invoker implements InvocationHandler {
+    private final Map<String, Message> returnTypes =
+        new ConcurrentHashMap<String, Message>();
+    private Class<? extends VersionedProtocol> protocol;
+    private InetSocketAddress address;
+    private User ticket;
+    private HBaseClient client;
+    private boolean isClosed = false;
+    final private int rpcTimeout;
+    private final long clientProtocolVersion;
+
+    public Invoker(Class<? extends VersionedProtocol> protocol,
+        InetSocketAddress addr, User ticket, Configuration conf,
+        SocketFactory factory, int rpcTimeout) throws IOException {
+      this.protocol = protocol;
+      this.address = addr;
+      this.ticket = ticket;
+      this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
+      this.rpcTimeout = rpcTimeout;
+      Long version = Invocation.PROTOCOL_VERSION.get(protocol);
+      if (version != null) {
+        this.clientProtocolVersion = version;
+      } else {
+        try {
+          this.clientProtocolVersion = HBaseRPC.getProtocolVersion(protocol);
+        } catch (NoSuchFieldException e) {
+          throw new RuntimeException("Exception encountered during " +
+                                      protocol, e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException("Exception encountered during " +
+                                      protocol, e);
+        }
+      }
+    }
+
+    private RpcRequestBody constructRpcRequest(Method method,
+        Object[] params) throws ServiceException {
+      RpcRequestBody rpcRequest;
+      RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
+      builder.setMethodName(method.getName());
+      Message param;
+      int length = params.length;
+      if (length == 2) {
+        // RpcController + Message in the method args
+        // (generated code from RPC bits in .proto files have RpcController)
+        param = (Message)params[1];
+      } else if (length == 1) { // Message
+        param = (Message)params[0];
+      } else {
+        throw new ServiceException("Too many parameters for request. Method: ["
+            + method.getName() + "]" + ", Expected: 2, Actual: "
+            + params.length);
+      }
+      builder.setRequest(param.toByteString());
+      builder.setClientProtocolVersion(clientProtocolVersion);
+      rpcRequest = builder.build();
+      return rpcRequest;
+    }
+
+    /**
+     * This is the client side invoker of RPC method. It only throws
+     * ServiceException, since the invocation proxy expects only
+     * ServiceException to be thrown by the method in case protobuf service.
+     *
+     * ServiceException has the following causes:
+     * <ol>
+     * <li>Exceptions encountered on the client side in this method are
+     * set as cause in ServiceException as is.</li>
+     * <li>Exceptions from the server are wrapped in RemoteException and are
+     * set as cause in ServiceException</li>
+     * </ol>
+     *
+     * Note that the client calling protobuf RPC methods, must handle
+     * ServiceException by getting the cause from the ServiceException. If the
+     * cause is RemoteException, then unwrap it to get the exception thrown by
+     * the server.
+     */
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws ServiceException {
+      long startTime = 0;
+      if (LOG.isDebugEnabled()) {
+        startTime = System.currentTimeMillis();
+      }
+
+      RpcRequestBody rpcRequest = constructRpcRequest(method, args);
+      RpcResponseWritable val = null;
+      try {
+        val = (RpcResponseWritable) client.call(
+            new RpcRequestWritable(rpcRequest), address, protocol, ticket,
+            rpcTimeout);
+
+        if (LOG.isDebugEnabled()) {
+          long callTime = System.currentTimeMillis() - startTime;
+          LOG.debug("Call: " + method.getName() + " " + callTime);
+        }
+
+        Message protoType = null;
+        protoType = getReturnProtoType(method);
+        Message returnMessage;
+        returnMessage = protoType.newBuilderForType()
+            .mergeFrom(val.responseMessage).build();
+        return returnMessage;
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    synchronized protected void close() {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
+    }
+
+    private Message getReturnProtoType(Method method) throws Exception {
+      if (returnTypes.containsKey(method.getName())) {
+        return returnTypes.get(method.getName());
+      }
+
+      Class<?> returnType = method.getReturnType();
+      Method newInstMethod = returnType.getMethod("getDefaultInstance");
+      newInstMethod.setAccessible(true);
+      Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
+      returnTypes.put(method.getName(), protoType);
+      return protoType;
+    }
+  }
+
+  /**
+   * Writable Wrapper for Protocol Buffer Requests
+   */
+  private static class RpcRequestWritable implements Writable {
+    RpcRequestBody message;
+
+    @SuppressWarnings("unused")
+    public RpcRequestWritable() {
+    }
+
+    RpcRequestWritable(RpcRequestBody message) {
+      this.message = message;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      ((Message)message).writeDelimitedTo(
+          DataOutputOutputStream.constructOutputStream(out));
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int length = ProtoUtil.readRawVarint32(in);
+      byte[] bytes = new byte[length];
+      in.readFully(bytes);
+      message = RpcRequestBody.parseFrom(bytes);
+    }
+    
+    public int getSerializedSize() {
+      return message.getSerializedSize();
+    }
+
+    @Override
+    public String toString() {
+      return " Client Protocol Version: " +
+          message.getClientProtocolVersion() + " MethodName: " +
+          message.getMethodName();
+    }
+  }
+
+  /**
+   * Writable Wrapper for Protocol Buffer Responses
+   */
+  private static class RpcResponseWritable implements Writable {
+    byte[] responseMessage;
+
+    @SuppressWarnings("unused")
+    public RpcResponseWritable() {
+    }
+
+    public RpcResponseWritable(Message message) {
+      this.responseMessage = message.toByteArray();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(responseMessage.length);
+      out.write(responseMessage);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int length = in.readInt();
+      byte[] bytes = new byte[length];
+      in.readFully(bytes);
+      responseMessage = bytes;
+    }
+  }
+  public static class Server extends WritableRpcEngine.Server {
+    boolean verbose;
+    Object instance;
+    Class<?> implementation;
+    private static final String WARN_RESPONSE_TIME =
+        "hbase.ipc.warn.response.time";
+    private static final String WARN_RESPONSE_SIZE =
+        "hbase.ipc.warn.response.size";
+
+    /** Default value for above params */
+    private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
+    private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+
+    /** Names for suffixed metrics */
+    private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
+
+    private final int warnResponseTime;
+    private final int warnResponseSize;
+    public Server(Object instance, final Class<?>[] ifaces,
+        Configuration conf, String bindAddress,  int port,
+        int numHandlers, int metaHandlerCount, boolean verbose,
+        int highPriorityLevel)
+        throws IOException {
+      super(instance, ifaces, RpcRequestWritable.class, conf, bindAddress, port,
+          numHandlers, metaHandlerCount, verbose, highPriorityLevel);
+      this.verbose = verbose;
+      this.instance = instance;
+      this.implementation = instance.getClass();
+      // create metrics for the advertised interfaces this server implements.
+      String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
+      this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes);
+
+      this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
+          DEFAULT_WARN_RESPONSE_TIME);
+      this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
+          DEFAULT_WARN_RESPONSE_SIZE);
+    }
+    private final Map<String, Message> methodArg =
+        new ConcurrentHashMap<String, Message>();
+    private final Map<String, Method> methodInstances =
+        new ConcurrentHashMap<String, Method>();
+    @Override
+    /**
+     * This is a server side method, which is invoked over RPC. On success
+     * the return response has protobuf response payload. On failure, the
+     * exception name and the stack trace are returned in the protobuf response.
+     */
+    public Writable call(Class<? extends VersionedProtocol> protocol,
+        Writable writableRequest, long receiveTime, MonitoredRPCHandler status)
+        throws IOException {
+      try {
+        RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+        RpcRequestBody rpcRequest = request.message;
+        String methodName = rpcRequest.getMethodName();
+        Method method = getMethod(protocol, methodName);
+        if (method == null) {
+          throw new HBaseRPC.UnknownProtocolException("Method " + methodName +
+              " doesn't exist in protocol " + protocol.getName());
+        }
+
+        /**
+         * RPCs for a particular interface (ie protocol) are done using a
+         * IPC connection that is setup using rpcProxy.
+         * The rpcProxy's has a declared protocol name that is
+         * sent form client to server at connection time.
+         */
+        //TODO: use the clientVersion to do protocol compatibility checks, and
+        //this could be used here to handle complex use cases like deciding
+        //which implementation of the protocol should be used to service the
+        //current request, etc. Ideally, we shouldn't land up in a situation
+        //where we need to support such a use case.
+        //For now the clientVersion field is simply ignored 
+        long clientVersion = rpcRequest.getClientProtocolVersion();
+
+        if (verbose) {
+          LOG.info("Call: protocol name=" + protocol.getName() +
+              ", method=" + methodName);
+        }
+
+        status.setRPC(rpcRequest.getMethodName(), 
+            new Object[]{rpcRequest.getRequest()}, receiveTime);
+        status.setRPCPacket(writableRequest);
+        status.resume("Servicing call");        
+        //get an instance of the method arg type
+        Message protoType = getMethodArgType(method);
+        Message param = protoType.newBuilderForType()
+            .mergeFrom(rpcRequest.getRequest()).build();
+        Message result;
+        Object impl = null;
+        if (protocol.isAssignableFrom(this.implementation)) {
+          impl = this.instance;
+        } else {
+          throw new HBaseRPC.UnknownProtocolException(protocol);
+        }
+
+        long startTime = System.currentTimeMillis();
+        if (method.getParameterTypes().length == 2) {
+          // RpcController + Message in the method args
+          // (generated code from RPC bits in .proto files have RpcController)
+          result = (Message)method.invoke(impl, null, param);
+        } else if (method.getParameterTypes().length == 1) {
+          // Message (hand written code usually has only a single argument)
+          result = (Message)method.invoke(impl, param);
+        } else {
+          throw new ServiceException("Too many parameters for method: ["
+              + method.getName() + "]" + ", allowed (at most): 2, Actual: "
+              + method.getParameterTypes().length);
+        }
+        int processingTime = (int) (System.currentTimeMillis() - startTime);
+        int qTime = (int) (startTime-receiveTime);
+        if (TRACELOG.isDebugEnabled()) {
+          TRACELOG.debug("Call #" + CurCall.get().id +
+              "; Served: " + protocol.getSimpleName()+"#"+method.getName() +
+              " queueTime=" + qTime +
+              " processingTime=" + processingTime +
+              " contents=" + Objects.describeQuantity(param));
+        }
+        rpcMetrics.rpcQueueTime.inc(qTime);
+        rpcMetrics.rpcProcessingTime.inc(processingTime);
+        rpcMetrics.inc(method.getName(), processingTime);
+        if (verbose) {
+          WritableRpcEngine.log("Return: "+result, LOG);
+        }
+        long responseSize = result.getSerializedSize();
+        // log any RPC responses that are slower than the configured warn
+        // response time or larger than configured warning size
+        boolean tooSlow = (processingTime > warnResponseTime
+            && warnResponseTime > -1);
+        boolean tooLarge = (responseSize > warnResponseSize
+            && warnResponseSize > -1);
+        if (tooSlow || tooLarge) {
+          // when tagging, we let TooLarge trump TooSmall to keep output simple
+          // note that large responses will often also be slow.
+          StringBuilder buffer = new StringBuilder(256);
+          buffer.append(methodName);
+          buffer.append("(");
+          buffer.append(param.getClass().getName());
+          buffer.append(")");
+          buffer.append(", client version="+clientVersion);
+          logResponse(new Object[]{rpcRequest.getRequest()}, 
+              methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
+              status.getClient(), startTime, processingTime, qTime,
+              responseSize);
+          // provides a count of log-reported slow responses
+          if (tooSlow) {
+            rpcMetrics.rpcSlowResponseTime.inc(processingTime);
+          }
+        }
+        if (processingTime > 1000) {
+          // we use a hard-coded one second period so that we can clearly
+          // indicate the time period we're warning about in the name of the
+          // metric itself
+          rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC,
+              processingTime);
+        }
+        return new RpcResponseWritable(result);
+      } catch (InvocationTargetException e) {
+        Throwable target = e.getTargetException();
+        if (target instanceof IOException) {
+          throw (IOException)target;
+        }
+        if (target instanceof ServiceException) {
+          throw ProtobufUtil.getRemoteException((ServiceException)target);
+        }
+        IOException ioe = new IOException(target.toString());
+        ioe.setStackTrace(target.getStackTrace());
+        throw ioe;
+      } catch (Throwable e) {
+        if (!(e instanceof IOException)) {
+          LOG.error("Unexpected throwable object ", e);
+        }
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
+        throw ioe;
+      }
+    }
+
+    private Method getMethod(Class<? extends VersionedProtocol> protocol,
+        String methodName) {
+      Method method = methodInstances.get(methodName);
+      if (method != null) {
+        return method;
+      }
+      Method[] methods = protocol.getMethods();
+      LOG.warn("Methods length : " + methods.length);
+      for (Method m : methods) {
+        if (m.getName().equals(methodName)) {
+          m.setAccessible(true);
+          methodInstances.put(methodName, m);
+          return m;
+        }
+      }
+      return null;
+    }
+
+    private Message getMethodArgType(Method method) throws Exception {
+      Message protoType = methodArg.get(method.getName());
+      if (protoType != null) {
+        return protoType;
+      }
+
+      Class<?>[] args = method.getParameterTypes();
+      Class<?> arg;
+      if (args.length == 2) {
+        // RpcController + Message in the method args
+        // (generated code from RPC bits in .proto files have RpcController)
+        arg = args[1];
+      } else if (args.length == 1) {
+        arg = args[0];
+      } else {
+        //unexpected
+        return null;
+      }
+      //in the protobuf methods, args[1] is the only significant argument
+      Method newInstMethod = arg.getMethod("getDefaultInstance");
+      newInstMethod.setAccessible(true);
+      protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
+      methodArg.put(method.getName(), protoType);
+      return protoType;
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Mon Jul 30 07:07:17 2012
@@ -28,18 +28,14 @@ import java.lang.reflect.UndeclaredThrow
 
 import java.net.InetSocketAddress;
 import java.io.*;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Set;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.AdminProtocol;
-import org.apache.hadoop.hbase.client.ClientProtocol;
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@@ -69,57 +65,6 @@ class WritableRpcEngine implements RpcEn
   // DEBUG log level does NOT emit RPC-level logging.
   private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
 
-  /* Cache a client using its socket factory as the hash key */
-  static private class ClientCache {
-    private Map<SocketFactory, HBaseClient> clients =
-      new HashMap<SocketFactory, HBaseClient>();
-
-    protected ClientCache() {}
-
-    /**
-     * Construct & cache an IPC client with the user-provided SocketFactory
-     * if no cached client exists.
-     *
-     * @param conf Configuration
-     * @param factory socket factory
-     * @return an IPC client
-     */
-    protected synchronized HBaseClient getClient(Configuration conf,
-        SocketFactory factory) {
-      // Construct & cache client.  The configuration is only used for timeout,
-      // and Clients have connection pools.  So we can either (a) lose some
-      // connection pooling and leak sockets, or (b) use the same timeout for
-      // all configurations.  Since the IPC is usually intended globally, not
-      // per-job, we choose (a).
-      HBaseClient client = clients.get(factory);
-      if (client == null) {
-        // Make an hbase client instead of hadoop Client.
-        client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
-        clients.put(factory, client);
-      } else {
-        client.incCount();
-      }
-      return client;
-    }
-
-    /**
-     * Stop a RPC client connection
-     * A RPC client is closed only when its reference count becomes zero.
-     * @param client client to stop
-     */
-    protected void stopClient(HBaseClient client) {
-      synchronized (this) {
-        client.decCount();
-        if (client.isZeroReference()) {
-          clients.remove(client.getSocketFactory());
-        }
-      }
-      if (client.isZeroReference()) {
-        client.stop();
-      }
-    }
-  }
-
   protected final static ClientCache CLIENTS = new ClientCache();
 
   private static class Invoker implements InvocationHandler {
@@ -150,8 +95,8 @@ class WritableRpcEngine implements RpcEn
 
       try {
         HbaseObjectWritable value = (HbaseObjectWritable)
-          client.call(new Invocation(method, args), address,
-                      protocol, ticket, rpcTimeout);
+          client.call(new Invocation(method, args), address, protocol, ticket, 
+                      rpcTimeout);
         if (logDebug) {
           // FIGURE HOW TO TURN THIS OFF!
           long callTime = System.currentTimeMillis() - startTime;
@@ -271,18 +216,23 @@ class WritableRpcEngine implements RpcEn
 
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
+     * @param ifaces the interfaces the server supports
+     * @param paramClass an instance of this class is used to read the RPC requests
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
+     * @param metaHandlerCount the number of meta handlers desired
      * @param verbose whether each call should be logged
+     * @param highPriorityLevel the priority level this server treats as high priority RPCs
      * @throws IOException e
      */
     public Server(Object instance, final Class<?>[] ifaces,
+                  Class<? extends Writable> paramClass,
                   Configuration conf, String bindAddress,  int port,
                   int numHandlers, int metaHandlerCount, boolean verbose,
                   int highPriorityLevel) throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount,
+      super(bindAddress, port, paramClass, numHandlers, metaHandlerCount,
           conf, classNameBase(instance.getClass().getName()),
           highPriorityLevel);
       this.instance = instance;
@@ -301,6 +251,14 @@ class WritableRpcEngine implements RpcEn
           DEFAULT_WARN_RESPONSE_SIZE);
     }
 
+    public Server(Object instance, final Class<?>[] ifaces,
+        Configuration conf, String bindAddress,  int port,
+        int numHandlers, int metaHandlerCount, boolean verbose,
+        int highPriorityLevel) throws IOException {
+      this(instance, ifaces, Invocation.class, conf, bindAddress, port, 
+          numHandlers, metaHandlerCount, verbose, highPriorityLevel);
+    }
+
     public AuthenticationTokenSecretManager createSecretManager(){
       if (!User.isSecurityEnabled() ||
           !(instance instanceof org.apache.hadoop.hbase.Server)) {
@@ -341,7 +299,7 @@ class WritableRpcEngine implements RpcEn
           throw new IOException("Could not find requested method, the usual " +
               "cause is a version mismatch between client and server.");
         }
-        if (verbose) log("Call: " + call);
+        if (verbose) log("Call: " + call, LOG);
         status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
         status.setRPCPacket(param);
         status.resume("Servicing call");
@@ -389,7 +347,7 @@ class WritableRpcEngine implements RpcEn
         rpcMetrics.rpcQueueTime.inc(qTime);
         rpcMetrics.rpcProcessingTime.inc(processingTime);
         rpcMetrics.inc(call.getMethodName(), processingTime);
-        if (verbose) log("Return: "+value);
+        if (verbose) log("Return: "+value, LOG);
 
         HbaseObjectWritable retVal =
           new HbaseObjectWritable(method.getReturnType(), value);
@@ -403,7 +361,8 @@ class WritableRpcEngine implements RpcEn
         if (tooSlow || tooLarge) {
           // when tagging, we let TooLarge trump TooSmall to keep output simple
           // note that large responses will often also be slow.
-          logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"),
+          logResponse(call.getParameters(), call.getMethodName(), 
+              call.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
               status.getClient(), startTime, processingTime, qTime,
               responseSize);
           // provides a count of log-reported slow responses
@@ -444,7 +403,9 @@ class WritableRpcEngine implements RpcEn
     /**
      * Logs an RPC response to the LOG file, producing valid JSON objects for
      * client Operations.
-     * @param call The call to log.
+     * @param params The parameters received in the call.
+     * @param methodName The name of the method invoked
+     * @param call The string representation of the call
      * @param tag  The tag that will be used to indicate this event in the log.
      * @param client          The address of the client who made this call.
      * @param startTime       The time that the call was initiated, in ms.
@@ -453,10 +414,10 @@ class WritableRpcEngine implements RpcEn
      *                        prior to being initiated, in ms.
      * @param responseSize    The size in bytes of the response buffer.
      */
-    private void logResponse(Invocation call, String tag, String clientAddress,
-        long startTime, int processingTime, int qTime, long responseSize)
+     void logResponse(Object[] params, String methodName, String call, String tag, 
+         String clientAddress, long startTime, int processingTime, int qTime, 
+         long responseSize)
       throws IOException {
-      Object params[] = call.getParameters();
       // for JSON encoding
       ObjectMapper mapper = new ObjectMapper();
       // base information that is reported regardless of type of call
@@ -467,7 +428,7 @@ class WritableRpcEngine implements RpcEn
       responseInfo.put("responsesize", responseSize);
       responseInfo.put("client", clientAddress);
       responseInfo.put("class", instance.getClass().getSimpleName());
-      responseInfo.put("method", call.getMethodName());
+      responseInfo.put("method", methodName);
       if (params.length == 2 && instance instanceof HRegionServer &&
           params[0] instanceof byte[] &&
           params[1] instanceof Operation) {
@@ -491,14 +452,14 @@ class WritableRpcEngine implements RpcEn
       } else {
         // can't get JSON details, so just report call.toString() along with 
         // a more generic tag.
-        responseInfo.put("call", call.toString());
+        responseInfo.put("call", call);
         LOG.warn("(response" + tag + "): " +
             mapper.writeValueAsString(responseInfo));
       }
     }
   }
 
-  protected static void log(String value) {
+  protected static void log(String value, Log LOG) {
     String v = value;
     if (v != null && v.length() > 55)
       v = v.substring(0, 55)+"...";

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Jul 30 07:07:17 2012
@@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.executor.
 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@@ -99,8 +101,6 @@ import org.apache.hadoop.hbase.master.me
 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -331,7 +331,7 @@ Server {
     }
     int numHandlers = conf.getInt("hbase.master.handler.count",
       conf.getInt("hbase.regionserver.handler.count", 25));
-    this.rpcServer = HBaseRPC.getServer(this,
+    this.rpcServer = HBaseRPC.getServer(MasterMonitorProtocol.class, this,
       new Class<?>[]{MasterMonitorProtocol.class,
         MasterAdminProtocol.class, RegionServerStatusProtocol.class},
         initialIsa.getHostName(), // BindAddress is IP we got for this server.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Mon Jul 30 07:07:17 2012
@@ -261,5 +261,4 @@ public class MonitoredRPCHandlerImpl ext
     }
     return super.toString() + ", rpcMethod=" + getRPC();
   }
-
 }