You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/03 18:26:51 UTC

svn commit: r1309019 [1/3] - in /hbase/trunk/src/main: java/org/apache/hadoop/hbase/io/ java/org/apache/hadoop/hbase/ipc/ java/org/apache/hadoop/hbase/protobuf/generated/ java/org/apache/hadoop/hbase/security/ protobuf/

Author: stack
Date: Tue Apr  3 16:26:50 2012
New Revision: 1309019

URL: http://svn.apache.org/viewvc?rev=1309019&view=rev
Log:
HBASE-5451 Switch RPC call envelope/headers to PBs

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
    hbase/trunk/src/main/protobuf/RPC.proto
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java?rev=1309019&r1=1309018&r2=1309019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java Tue Apr  3 16:26:50 2012
@@ -27,7 +27,7 @@ import org.apache.hadoop.classification.
  * OutputStream implementation that wraps a DataOutput.
  */
 @InterfaceAudience.Private
-class DataOutputOutputStream extends OutputStream {
+public class DataOutputOutputStream extends OutputStream {
 
   private final DataOutput out;
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1309019&r1=1309018&r2=1309019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Apr  3 16:26:50 2012
@@ -46,18 +46,23 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.hbase.io.DataOutputOutputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.protobuf.ByteString;
+
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -233,8 +238,9 @@ public class HBaseClient {
       User ticket = remoteId.getTicket();
       Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
 
-      header = new ConnectionHeader(
-          protocol == null ? null : protocol.getName(), ticket);
+      ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
+      builder.setProtocol(protocol == null ? "" : protocol.getName());
+      this.header = builder.build();
 
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
         remoteId.getAddress().toString() +
@@ -436,13 +442,8 @@ public class HBaseClient {
     private void writeHeader() throws IOException {
       out.write(HBaseServer.HEADER.array());
       out.write(HBaseServer.CURRENT_VERSION);
-      //When there are more fields we can have ConnectionHeader Writable.
-      DataOutputBuffer buf = new DataOutputBuffer();
-      header.write(buf);
-
-      int bufLen = buf.getLength();
-      out.writeInt(bufLen);
-      out.write(buf.getData(), 0, bufLen);
+      out.writeInt(header.getSerializedSize());
+      header.writeTo(out);
     }
 
     /* wait till someone signals us to start reading RPC response or
@@ -451,7 +452,6 @@ public class HBaseClient {
      *
      * Return true if it is time to read a response; false otherwise.
      */
-    @SuppressWarnings({"ThrowableInstanceNeverThrown"})
     protected synchronized boolean waitForWork() {
       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
         long timeout = maxIdleTime-
@@ -526,32 +526,24 @@ public class HBaseClient {
       if (shouldCloseConnection.get()) {
         return;
       }
-
-      // For serializing the data to be written.
-
-      final DataOutputBuffer d = new DataOutputBuffer();
       try {
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " sending #" + call.id);
-
-        d.writeInt(0xdeadbeef); // placeholder for data length
-        d.writeInt(call.id);
-        call.param.write(d);
-        byte[] data = d.getData();
-        int dataLength = d.getLength();
-        // fill in the placeholder
-        Bytes.putInt(data, 0, dataLength - 4);
+        RpcRequest.Builder builder = RPCProtos.RpcRequest.newBuilder();
+        builder.setCallId(call.id);
+        Invocation invocation = (Invocation)call.param;
+        DataOutputBuffer d = new DataOutputBuffer();
+        invocation.write(d);
+        builder.setRequest(ByteString.copyFrom(d.getData()));
         //noinspection SynchronizeOnNonFinalField
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
-          out.write(data, 0, dataLength);
-          out.flush();
+          RpcRequest obj = builder.build();
+          this.out.writeInt(obj.getSerializedSize());
+          obj.writeTo(DataOutputOutputStream.constructOutputStream(this.out));
+          this.out.flush();
         }
       } catch(IOException e) {
         markClosed(e);
-      } finally {
-        //the buffer is just an in-memory buffer, but it is still polite to
-        // close early
-        IOUtils.closeStream(d);
       }
     }
 
@@ -566,33 +558,31 @@ public class HBaseClient {
 
       try {
         // See HBaseServer.Call.setResponse for where we write out the response.
-        // It writes the call.id (int), a flag byte, then optionally the length
-        // of the response (int) followed by data.
+        // It writes the call.id (int), a boolean signifying any error (and if 
+        // so the exception name/trace), and the response bytes
 
         // Read the call id.
-        int id = in.readInt();
+        RpcResponse response = RpcResponse.parseDelimitedFrom(in);
+        int id = response.getCallId();
 
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " got value #" + id);
         Call call = calls.remove(id);
 
-        // Read the flag byte
-        byte flag = in.readByte();
-        boolean isError = ResponseFlag.isError(flag);
-        if (ResponseFlag.isLength(flag)) {
-          // Currently length if present is unused.
-          in.readInt();
-        }
-        int state = in.readInt(); // Read the state.  Currently unused.
+        boolean isError = response.getError();
         if (isError) {
           if (call != null) {
             //noinspection ThrowableInstanceNeverThrown
-            call.setException(new RemoteException(WritableUtils.readString(in),
-                WritableUtils.readString(in)));
+            call.setException(new RemoteException(
+                response.getException().getExceptionName(),
+                response.getException().getStackTrace()));
           }
         } else {
+          ByteString responseObj = response.getResponse();
+          DataInputStream dis =
+              new DataInputStream(responseObj.newInput());
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
-          value.readFields(in);                 // read value
+          value.readFields(dis);                 // read value
           // it's possible that this call may have been cleaned up due to a RPC
           // timeout, so check if it still exists before setting the value.
           if (call != null) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1309019&r1=1309018&r2=1309019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Apr  3 16:26:50 2012
@@ -59,22 +59,28 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.DataOutputOutputStream;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
 
 import org.cliffc.high_scale_lib.Counter;
 
@@ -94,7 +100,7 @@ public abstract class HBaseServer implem
    * The first four bytes of Hadoop RPC connections
    */
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
-  public static final byte CURRENT_VERSION = 3;
+  public static final byte CURRENT_VERSION = 5;
 
   /**
    * How many calls/handler are allowed in the queue.
@@ -333,40 +339,27 @@ public abstract class HBaseServer implem
       ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
       DataOutputStream out = new DataOutputStream(buf);
       try {
+        RpcResponse.Builder builder = RpcResponse.newBuilder();
         // Call id.
-        out.writeInt(this.id);
-        // Write flag.
-        byte flag = (error != null)?
-          ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly();
-        out.writeByte(flag);
-        // Place holder for length set later below after we
-        // fill the buffer with data.
-        out.writeInt(0xdeadbeef);
-        out.writeInt(status.state);
-      } catch (IOException e) {
-        errorClass = e.getClass().getName();
-        error = StringUtils.stringifyException(e);
-      }
-
-      try {
-        if (error == null) {
-          result.write(out);
+        builder.setCallId(this.id);
+        builder.setError(error != null);
+        if (error != null) {
+          RpcException.Builder b = RpcException.newBuilder();
+          b.setExceptionName(errorClass);
+          b.setStackTrace(error);
+          builder.setException(b.build());
         } else {
-          WritableUtils.writeString(out, errorClass);
-          WritableUtils.writeString(out, error);
+          DataOutputBuffer d = new DataOutputBuffer(size);
+          result.write(d);
+          byte[] response = d.getData();
+          builder.setResponse(ByteString.copyFrom(response));
         }
+        builder.build().writeDelimitedTo(
+            DataOutputOutputStream.constructOutputStream(out));
       } catch (IOException e) {
-        LOG.warn("Error sending response to call: ", e);
+        LOG.warn("Exception while creating response " + e);
       }
-
-      // Set the length into the ByteBuffer after call id and after
-      // byte flag.
       ByteBuffer bb = buf.getByteBuffer();
-      int bufSiz = bb.remaining();
-      // Move to the size location in our ByteBuffer past call.id
-      // and past the byte flag.
-      bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); 
-      bb.putInt(bufSiz);
       bb.position(0);
       this.response = bb;
     }
@@ -1065,9 +1058,9 @@ public abstract class HBaseServer implem
     // disconnected, we can say where it used to connect to.
     protected String hostAddress;
     protected int remotePort;
-    ConnectionHeader header = new ConnectionHeader();
+    ConnectionHeader header;
     Class<? extends VersionedProtocol> protocol;
-    protected User ticket = null;
+    protected User user = null;
 
     public Connection(SocketChannel channel, long lastContact) {
       this.channel = channel;
@@ -1231,26 +1224,21 @@ public abstract class HBaseServer implem
 
     /// Reads the connection header following version
     private void processHeader() throws IOException {
-      DataInputStream in =
-        new DataInputStream(new ByteArrayInputStream(data.array()));
-      header.readFields(in);
+      header = ConnectionHeader.parseFrom(new ByteArrayInputStream(data.array()));
       try {
         String protocolClassName = header.getProtocol();
-        if (protocolClassName == null) {
-          protocolClassName = "org.apache.hadoop.hbase.ipc.HRegionInterface";
-        }
         protocol = getProtocolClass(protocolClassName, conf);
       } catch (ClassNotFoundException cnfe) {
         throw new IOException("Unknown protocol: " + header.getProtocol());
       }
 
-      ticket = header.getUser();
+      user = User.createUser(header);
     }
 
     protected void processData(byte[] buf) throws  IOException, InterruptedException {
-      DataInputStream dis =
-        new DataInputStream(new ByteArrayInputStream(buf));
-      int id = dis.readInt();                    // try to read an id
+      RpcRequest request = RpcRequest.parseFrom(buf);
+      int id = request.getCallId();
+      ByteString clientRequest = request.getRequest();
       long callSize = buf.length;
 
       if (LOG.isDebugEnabled()) {
@@ -1271,6 +1259,8 @@ public abstract class HBaseServer implem
 
       Writable param;
       try {
+        DataInputStream dis =
+            new DataInputStream(clientRequest.newInput());
         param = ReflectionUtils.newInstance(paramClass, conf);//read param
         param.readFields(dis);
       } catch (Throwable t) {
@@ -1372,12 +1362,12 @@ public abstract class HBaseServer implem
               throw new ServerNotRunningYetException("Server is not running yet");
 
             if (LOG.isDebugEnabled()) {
-              User remoteUser = call.connection.ticket;
+              User remoteUser = call.connection.user;
               LOG.debug(getName() + ": call #" + call.id + " executing as "
                   + (remoteUser == null ? "NULL principal" : remoteUser.getName()));
             }
 
-            RequestContext.set(call.connection.ticket, getRemoteIp(),
+            RequestContext.set(call.connection.user, getRemoteIp(),
                 call.connection.protocol);
             // make the call
             value = call(call.connection.protocol, call.param, call.timestamp,