You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2013/07/16 21:04:17 UTC

svn commit: r1503832 - in /hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/

Author: daryn
Date: Tue Jul 16 19:04:17 2013
New Revision: 1503832

URL: http://svn.apache.org/r1503832
Log:
merge -c 1503830 FIXES: HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1503832&r1=1503831&r2=1503832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt Tue Jul 16 19:04:17 2013
@@ -39,6 +39,8 @@ Release 2.1.0-beta - 2013-07-02
     HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide
     negotiation capabilities (daryn)
 
+    HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
+
   NEW FEATURES
 
     HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1503832&r1=1503831&r2=1503832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Tue Jul 16 19:04:17 2013
@@ -62,7 +62,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -113,6 +116,8 @@ public class Client {
   private final boolean fallbackAllowed;
   
   final static int PING_CALL_ID = -1;
+
+  final static int CONNECTION_CONTEXT_CALL_ID = -3;
   
   /**
    * Executor on which IPC calls' parameters are sent. Deferring
@@ -779,17 +784,19 @@ public class Client {
                                         AuthMethod authMethod)
                                             throws IOException {
       // Write out the ConnectionHeader
-      DataOutputBuffer buf = new DataOutputBuffer();
-      ProtoUtil.makeIpcConnectionContext(
+      IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
           RPC.getProtocolName(remoteId.getProtocol()),
           remoteId.getTicket(),
-          authMethod).writeTo(buf);
+          authMethod);
+      RpcRequestHeaderProto connectionContextHeader =
+          ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+              OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID);
+      RpcRequestMessageWrapper request =
+          new RpcRequestMessageWrapper(connectionContextHeader, message);
       
       // Write out the packet length
-      int bufLen = buf.getLength();
-
-      out.writeInt(bufLen);
-      out.write(buf.getData(), 0, bufLen);
+      out.writeInt(request.getLength());
+      request.write(out);
     }
     
     /* wait till someone signals us to start reading RPC response or

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1503832&r1=1503831&r2=1503832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Tue Jul 16 19:04:17 2013
@@ -71,7 +71,6 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -108,6 +107,7 @@ import com.google.common.annotations.Vis
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -761,9 +761,10 @@ public abstract class Server {
         LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
         throw ieo;
       } catch (Exception e) {
-        LOG.info(getName() + ": readAndProcess threw exception " + e +
-            " from client " + c.getHostAddress() +
-            ". Count of bytes read: " + count, e);
+        // log stack trace for "interesting" exceptions not sent to client
+        LOG.info(getName() + ": readAndProcess from client " +
+            c.getHostAddress() + " threw exception [" + e + "]",
+            (e instanceof WrappedRpcServerException) ? null : e);
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
@@ -1083,6 +1084,32 @@ public abstract class Server {
     }
   };
   
+  /**
+   * Wrapper for RPC IOExceptions to be returned to the client.  Used to
+   * let exceptions bubble up to top of processOneRpc where the correct
+   * callId can be associated with the response.  Also used to prevent
+   * unnecessary stack trace logging if it's not an internal server error. 
+   */
+  @SuppressWarnings("serial")
+  private static class WrappedRpcServerException extends RpcServerException {
+    private final RpcErrorCodeProto errCode;
+    public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
+      super(ioe.toString(), ioe);
+      this.errCode = errCode;
+    }
+    public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {
+      this(errCode, new RpcServerException(message));
+    }
+    @Override
+    public RpcErrorCodeProto getRpcErrorCodeProto() {
+      return errCode;
+    }
+    @Override
+    public String toString() {
+      return getCause().toString();
+    }
+  }
+
   /** Reads calls from a connection and queues them for handling. */
   public class Connection {
     private boolean connectionHeaderRead = false; // connection  header is read?
@@ -1120,6 +1147,8 @@ public abstract class Server {
 
     // Fake 'call' for failed authorization response
     private static final int AUTHORIZATION_FAILED_CALLID = -1;
+    private static final int CONNECTION_CONTEXT_CALL_ID = -3;
+    
     private final Call authFailedCall = 
       new Call(AUTHORIZATION_FAILED_CALLID, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
@@ -1200,7 +1229,7 @@ public abstract class Server {
     }
     
     private UserGroupInformation getAuthorizedUgi(String authorizedId)
-        throws IOException {
+        throws InvalidToken, AccessControlException {
       if (authMethod == AuthMethod.TOKEN) {
         TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
             secretManager);
@@ -1216,12 +1245,17 @@ public abstract class Server {
       }
     }
 
-    private void saslReadAndProcess(byte[] saslToken) throws IOException,
-        InterruptedException {
-      if (!saslContextEstablished) {
-        RpcSaslProto saslResponse;
+    private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws
+        WrappedRpcServerException, InterruptedException {
+      if (saslContextEstablished) {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+            new SaslException("Negotiation is already complete"));
+      }
+      RpcSaslProto saslResponse = null;
+      try {
         try {
-          saslResponse = processSaslMessage(saslToken);
+          saslResponse = processSaslMessage(dis);
         } catch (IOException e) {
           IOException sendToClient = e;
           Throwable cause = e;
@@ -1237,9 +1271,7 @@ public abstract class Server {
           // attempting user could be null
           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
             " (" + e.getLocalizedMessage() + ")");
-          // wait to send response until failure is logged
-          doSaslReply(sendToClient);
-          throw e;
+          throw sendToClient;
         }
         
         if (saslServer != null && saslServer.isComplete()) {
@@ -1257,37 +1289,19 @@ public abstract class Server {
           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
           saslContextEstablished = true;
         }
-        // send reply here to avoid a successful auth being logged as a
-        // failure if response can't be sent
-        doSaslReply(saslResponse);
-      } else {
-        if (LOG.isDebugEnabled())
-          LOG.debug("Have read input token of size " + saslToken.length
-              + " for processing by saslServer.unwrap()");
-        
-        if (!useWrap) {
-          processOneRpc(saslToken);
-        } else {
-          byte[] plaintextData = saslServer.unwrap(saslToken, 0,
-              saslToken.length);
-          processUnwrappedData(plaintextData);
-        }
+      } catch (WrappedRpcServerException wrse) { // don't re-wrap
+        throw wrse;
+      } catch (IOException ioe) {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
       }
+      return saslResponse; 
     }
     
-    private RpcSaslProto processSaslMessage(byte[] buf)
+    private RpcSaslProto processSaslMessage(DataInputStream dis)
         throws IOException, InterruptedException {
-      final DataInputStream dis =
-          new DataInputStream(new ByteArrayInputStream(buf));
-      RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper();
-      requestWrapper.readFields(dis);
-      
-      final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader;
-      if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) {
-        throw new SaslException("Client sent non-SASL request");
-      }      
       final RpcSaslProto saslMessage =
-          RpcSaslProto.parseFrom(requestWrapper.theRequestRead);
+          decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
       RpcSaslProto saslResponse = null;
       final SaslState state = saslMessage.getState(); // required      
       switch (state) {
@@ -1337,8 +1351,7 @@ public abstract class Server {
       return saslResponse;
     }
     
-    private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken)
-        throws IOException {
+    private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Will send " + state + " token of size "
             + ((replyToken != null) ? replyToken.length : null)
@@ -1352,8 +1365,7 @@ public abstract class Server {
       return response.build();
     }
     
-    private void doSaslReply(Message message)
-        throws IOException {
+    private void doSaslReply(Message message) throws IOException {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending sasl message "+message);
       }
@@ -1465,16 +1477,7 @@ public abstract class Server {
           dataLengthBuffer.clear();
           data.flip();
           boolean isHeaderRead = connectionContextRead;
-          if (authProtocol == AuthProtocol.SASL) {
-            // switch to simple must ignore next negotiate or initiate
-            if (skipInitialSaslHandshake) {
-              authProtocol = AuthProtocol.NONE;
-            } else {
-              saslReadAndProcess(data.array());
-            }
-          } else {
-            processOneRpc(data.array());
-          }
+          processRpcRequestPacket(data.array());
           data = null;
           if (!isHeaderRead) {
             continue;
@@ -1509,6 +1512,7 @@ public abstract class Server {
           // switch to simple hack, but don't switch if other auths are
           // supported, ex. tokens
           if (isSimpleEnabled && enabledAuthMethods.size() == 1) {
+            authProtocol = AuthProtocol.NONE;
             skipInitialSaslHandshake = true;
             doSaslReply(buildSaslResponse(SaslState.SUCCESS, null));
           }
@@ -1608,11 +1612,21 @@ public abstract class Server {
       responder.doRespond(fakeCall);
     }
 
-    /** Reads the connection context following the connection header */
-    private void processConnectionContext(byte[] buf) throws IOException {
-      DataInputStream in =
-        new DataInputStream(new ByteArrayInputStream(buf));
-      connectionContext = IpcConnectionContextProto.parseFrom(in);
+    /** Reads the connection context following the connection header
+     * @param dis - DataInputStream from which to read the header 
+     * @throws WrappedRpcServerException - if the header cannot be
+     *         deserialized, or the user is not authorized
+     */ 
+    private void processConnectionContext(DataInputStream dis)
+        throws WrappedRpcServerException {
+      // allow only one connection context during a session
+      if (connectionContextRead) {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+            "Connection context already processed");
+      }
+      connectionContext = decodeProtobufFromStream(
+          IpcConnectionContextProto.newBuilder(), dis);
       protocolName = connectionContext.hasProtocol() ? connectionContext
           .getProtocol() : null;
 
@@ -1629,9 +1643,11 @@ public abstract class Server {
             && (!protocolUser.getUserName().equals(user.getUserName()))) {
           if (authMethod == AuthMethod.TOKEN) {
             // Not allowed to doAs if token authentication is used
-            throw new AccessControlException("Authenticated user (" + user
-                + ") doesn't match what the client claims to be ("
-                + protocolUser + ")");
+            throw new WrappedRpcServerException(
+                RpcErrorCodeProto.FATAL_UNAUTHORIZED,
+                new AccessControlException("Authenticated user (" + user
+                    + ") doesn't match what the client claims to be ("
+                    + protocolUser + ")"));
           } else {
             // Effective user can be different from authenticated user
             // for simple auth or kerberos auth
@@ -1642,9 +1658,34 @@ public abstract class Server {
           }
         }
       }
+      authorizeConnection();
+      // don't set until after authz because connection isn't established
+      connectionContextRead = true;
+    }
+    
+    /**
+     * Process a RPC Request - if SASL wrapping is enabled, unwrap the
+     * requests and process each one, else directly process the request 
+     * @param buf - single request or SASL wrapped requests
+     * @throws IOException - connection failed to authenticate or authorize,
+     *   or the request could not be decoded into a Call
+     * @throws InterruptedException
+     */    
+    private void processRpcRequestPacket(byte[] buf) throws IOException,
+        InterruptedException {
+      if (saslContextEstablished && useWrap) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Have read input token of size " + buf.length
+              + " for processing by saslServer.unwrap()");        
+        final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
+        // loops over decoded data and calls processOneRpc
+        unwrapPacketAndProcessRpcs(plaintextData);
+      } else {
+        processOneRpc(buf);
+      }
     }
     
-    private void processUnwrappedData(byte[] inBuf) throws IOException,
+    private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException,
         InterruptedException {
       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
           inBuf));
@@ -1683,61 +1724,93 @@ public abstract class Server {
       }
     }
     
-    private void processOneRpc(byte[] buf) throws IOException,
-        InterruptedException {
-      if (connectionContextRead) {
-        processRpcRequest(buf);
-      } else {
-        processConnectionContext(buf);
-        connectionContextRead = true;
-        if (!authorizeConnection()) {
-          throw new AccessControlException("Connection from " + this
-              + " for protocol " + connectionContext.getProtocol()
-              + " is unauthorized for user " + user);      
+    /**
+     * Process an RPC Request - handle connection setup and decoding of
+     * request into a Call
+     * @param buf - contains the RPC request header and the rpc request
+     * @throws IOException - internal error that should not be returned to
+     *         client, typically failure to respond to client
+     * @throws WrappedRpcServerException - an exception to be sent back to
+     *         the client that does not require verbose logging by the
+     *         Listener thread
+     * @throws InterruptedException
+     */    
+    private void processOneRpc(byte[] buf)
+        throws IOException, WrappedRpcServerException, InterruptedException {
+      int callId = -1;
+      try {
+        final DataInputStream dis =
+            new DataInputStream(new ByteArrayInputStream(buf));
+        final RpcRequestHeaderProto header =
+            decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
+        callId = header.getCallId();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" got #" + callId);
         }
+        checkRpcHeaders(header);
+        
+        if (callId < 0) { // callIds typically used during connection setup
+          processRpcOutOfBandRequest(header, dis);
+        } else if (!connectionContextRead) {
+          throw new WrappedRpcServerException(
+              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+              "Connection context not established");
+        } else {
+          processRpcRequest(header, dis);
+        }
+      } catch (WrappedRpcServerException wrse) { // inform client of error
+        Throwable ioe = wrse.getCause();
+        final Call call = new Call(callId, null, this);
+        setupResponse(authFailedResponse, call,
+            RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
+            ioe.getClass().getName(), ioe.getMessage());
+        responder.doRespond(call);
+        throw wrse;
       }
     }
-    
+
     /**
-     * Process an RPC Request - the connection headers and context have been
-     * read
-     * @param buf - contains the RPC request header and the rpc request
-     * @throws RpcServerException due to fatal rpc layer issues such as
-     *   invalid header. In this case a RPC fatal status response is sent back
-     *   to client.
+     * Verify RPC header is valid
+     * @param header - RPC request header
+     * @throws WrappedRpcServerException - header contains invalid values 
      */
-    
-    private void processRpcRequest(byte[] buf) 
-        throws  RpcServerException, IOException, InterruptedException {
-      DataInputStream dis =
-        new DataInputStream(new ByteArrayInputStream(buf));
-      RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
-        
-      if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + header.getCallId());
+    private void checkRpcHeaders(RpcRequestHeaderProto header)
+        throws WrappedRpcServerException {
       if (!header.hasRpcOp()) {
         String err = " IPC Server: No rpc op in rpcRequestHeader";
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
       }
       if (header.getRpcOp() != 
           RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
         String err = "IPC Server does not implement rpc header operation" + 
                 header.getRpcOp();
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
       }
       // If we know the rpc kind, get its class so that we can deserialize
       // (Note it would make more sense to have the handler deserialize but 
       // we continue with this original design.
       if (!header.hasRpcKind()) {
         String err = " IPC Server: No rpc kind in rpcRequestHeader";
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
       }
+    }
+
+    /**
+     * Process an RPC Request - the connection headers and context must
+     * have been already read
+     * @param header - RPC request header
+     * @param dis - stream to request payload
+     * @throws WrappedRpcServerException - due to fatal rpc layer issues such
+     *   as invalid header or deserialization error. In this case a RPC fatal
+     *   status response will later be sent back to client.
+     * @throws InterruptedException
+     */
+    private void processRpcRequest(RpcRequestHeaderProto header,
+        DataInputStream dis) throws WrappedRpcServerException,
+        InterruptedException {
       Class<? extends Writable> rpcRequestClass = 
           getRpcRequestWrapper(header.getRpcKind());
       if (rpcRequestClass == null) {
@@ -1745,9 +1818,8 @@ public abstract class Server {
             " from client " + getHostAddress());
         final String err = "Unknown rpc kind in rpc header"  + 
             header.getRpcKind();
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);   
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);   
       }
       Writable rpcRequest;
       try { //Read the rpc request
@@ -1757,17 +1829,9 @@ public abstract class Server {
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress() + "on connection protocol " +
             this.protocolName + " for rpcKind " + header.getRpcKind(),  t);
-        final Call readParamsFailedCall = 
-            new Call(header.getCallId(), null, this);
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         String err = "IPC server unable to read call parameters: "+ t.getMessage();
-
-        setupResponse(responseBuffer, readParamsFailedCall, 
-            RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
-            null, t.getClass().getName(),
-            err);
-        responder.doRespond(readParamsFailedCall);
-        throw new RpcServerException(err, t);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
       }
         
       Call call = new Call(header.getCallId(), rpcRequest, this, 
@@ -1776,7 +1840,59 @@ public abstract class Server {
       incRpcCount();  // Increment the rpc count
     }
 
-    private boolean authorizeConnection() throws IOException {
+
+    /**
+     * Establish RPC connection setup by negotiating SASL if required, then
+     * reading and authorizing the connection header
+     * @param header - RPC header
+     * @param dis - stream to request payload
+     * @throws WrappedRpcServerException - setup failed due to SASL
+     *         negotiation failure, premature or invalid connection context,
+     *         or other state errors 
+     * @throws IOException - failed to send a response back to the client
+     * @throws InterruptedException
+     */
+    private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
+        DataInputStream dis) throws WrappedRpcServerException, IOException,
+        InterruptedException {
+      final int callId = header.getCallId();
+      if (callId == CONNECTION_CONTEXT_CALL_ID) {
+        // SASL must be established prior to connection context
+        if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {
+          throw new WrappedRpcServerException(
+              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+              "Connection header sent during SASL negotiation");
+        }
+        // read and authorize the user
+        processConnectionContext(dis);
+      } else if (callId == AuthProtocol.SASL.callId) {
+        // if client was switched to simple, ignore first SASL message
+        if (authProtocol != AuthProtocol.SASL) {
+          if (!skipInitialSaslHandshake) {
+            throw new WrappedRpcServerException(
+                RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+                "SASL protocol not requested by client");
+          }
+          skipInitialSaslHandshake = false;
+          return;
+        }
+        RpcSaslProto response = saslReadAndProcess(dis);
+        // send back response if any, may throw IOException
+        if (response != null) {
+          doSaslReply(response);
+        }
+      } else {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+            "Unknown out of band call #" + callId);
+      }
+    }    
+
+    /**
+     * Authorize proxy users to access this server
+     * @throws WrappedRpcServerException - user is not allowed to proxy
+     */
+    private void authorizeConnection() throws WrappedRpcServerException {
       try {
         // If auth method is DIGEST, the token was obtained by the
         // real user for the effective user, therefore not required to
@@ -1792,17 +1908,37 @@ public abstract class Server {
         }
         rpcMetrics.incrAuthorizationSuccesses();
       } catch (AuthorizationException ae) {
+        LOG.info("Connection from " + this
+            + " for protocol " + connectionContext.getProtocol()
+            + " is unauthorized for user " + user);
         rpcMetrics.incrAuthorizationFailures();
-        setupResponse(authFailedResponse, authFailedCall, 
-            RpcStatusProto.FATAL,  RpcErrorCodeProto.FATAL_UNAUTHORIZED, null,
-            ae.getClass().getName(), ae.getMessage());
-        responder.doRespond(authFailedCall);
-        return false;
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
       }
-      return true;
     }
     
     /**
+     * Decode the a protobuf from the given input stream 
+     * @param builder - Builder of the protobuf to decode
+     * @param dis - DataInputStream to read the protobuf
+     * @return Message - decoded protobuf
+     * @throws WrappedRpcServerException - deserialization failed
+     */
+    @SuppressWarnings("unchecked")
+    private <T extends Message> T decodeProtobufFromStream(Builder builder,
+        DataInputStream dis) throws WrappedRpcServerException {
+      try {
+        builder.mergeDelimitedFrom(dis);
+        return (T)builder.build();
+      } catch (Exception ioe) {
+        Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+            "Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
+      }
+    }
+
+    /**
      * Get service class for connection
      * @return the serviceClass
      */
@@ -2206,18 +2342,6 @@ public abstract class Server {
   }
   
   
-  private void respondBadRpcHeader(Call call, String errorClass, String error)
-      throws IOException
-  {
-    ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
-    setupResponse(responseBuf, call, 
-        RpcStatusProto.FATAL,  RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
-        null, errorClass, error);
-    responder.doRespond(call);
-    return; 
-    
-  }
-  
   private void wrapWithSasl(ByteArrayOutputStream response, Call call)
       throws IOException {
     if (call.connection.saslServer != null) {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1503832&r1=1503831&r2=1503832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java Tue Jul 16 19:04:17 2013
@@ -312,7 +312,7 @@ public class TestSaslRPC {
       doDigestRpc(server, sm);
     } catch (RemoteException e) {
       LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
-      assertTrue(ERROR_MESSAGE.equals(e.getLocalizedMessage()));
+      assertEquals(ERROR_MESSAGE, e.getLocalizedMessage());
       assertTrue(e.unwrapRemoteException() instanceof InvalidToken);
       succeeded = true;
     }
@@ -818,6 +818,7 @@ public class TestSaslRPC {
     }
 
     try {
+      LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
       return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
         @Override
         public String run() throws IOException {