You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:02 UTC

svn commit: r1513258 [5/9] - in /hadoop/common/branches/YARN-321/hadoop-common-project: ./ hadoop-annotations/ hadoop-auth-examples/ hadoop-auth-examples/src/main/webapp/ hadoop-auth-examples/src/main/webapp/annonymous/ hadoop-auth-examples/src/main/we...

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Aug 12 21:25:49 2013
@@ -71,7 +71,9 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -84,6 +86,7 @@ import org.apache.hadoop.ipc.protobuf.Rp
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslAuth;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslState;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -108,6 +111,7 @@ import com.google.common.annotations.Vis
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -155,11 +159,7 @@ public abstract class Server {
       return terseExceptions.contains(t.toString());
     }
   }
-  
-  /**
-   * The first four bytes of Hadoop RPC connections
-   */
-  public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+
   
   /**
    * If the user accidentally sends an HTTP GET to an IPC port, we detect this
@@ -177,17 +177,6 @@ public abstract class Server {
     "Content-type: text/plain\r\n\r\n" +
     "It looks like you are making an HTTP request to a Hadoop IPC port. " +
     "This is not the correct port for the web interface on this daemon.\r\n";
-  
-  // 1 : Introduce ping and server does not throw away RPCs
-  // 3 : Introduce the protocol into the RPC connection header
-  // 4 : Introduced SASL security layer
-  // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
-  //     in ObjectWritable to efficiently transmit arrays of primitives
-  // 6 : Made RPC Request header explicit
-  // 7 : Changed Ipc Connection Header to use Protocol buffers
-  // 8 : SASL server always sends a final response
-  // 9 : Changes to protocol for HADOOP-8990
-  public static final byte CURRENT_VERSION = 9;
 
   /**
    * Initial and max size of response buffer
@@ -281,16 +270,50 @@ public abstract class Server {
    */
   private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
   
+  /** Get the current call */
+  @VisibleForTesting
+  public static ThreadLocal<Call> getCurCall() {
+    return CurCall;
+  }
+  
+  /**
+   * Returns the currently active RPC call's sequential ID number.  A negative
+   * call ID indicates an invalid value, such as if there is no currently active
+   * RPC call.
+   * 
+   * @return int sequential ID number of currently active RPC call
+   */
+  public static int getCallId() {
+    Call call = CurCall.get();
+    return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
+  }
+  
+  /**
+   * @return The current active RPC call's retry count. -1 indicates the retry
+   *         cache is not supported in the client side.
+   */
+  public static int getCallRetryCount() {
+    Call call = CurCall.get();
+    return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;
+  }
+
   /** Returns the remote side ip address when invoked inside an RPC 
    *  Returns null incase of an error.
    */
   public static InetAddress getRemoteIp() {
     Call call = CurCall.get();
-    if (call != null) {
-      return call.connection.getHostInetAddress();
-    }
-    return null;
+    return (call != null && call.connection != null) ? call.connection
+        .getHostInetAddress() : null;
   }
+  
+  /**
+   * Returns the clientId from the current RPC request
+   */
+  public static byte[] getClientId() {
+    Call call = CurCall.get();
+    return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
+  }
+  
   /** Returns remote address as a string when invoked inside an RPC.
    *  Returns null in case of an error.
    */
@@ -305,7 +328,8 @@ public abstract class Server {
    */
   public static UserGroupInformation getRemoteUser() {
     Call call = CurCall.get();
-    return (call != null) ? call.connection.user : null;
+    return (call != null && call.connection != null) ? call.connection.user
+        : null;
   }
  
   /** Return true if the invocation was through an RPC.
@@ -443,30 +467,39 @@ public abstract class Server {
   }
 
   /** A call queued for handling. */
-  private static class Call {
+  public static class Call {
     private final int callId;             // the client's call id
+    private final int retryCount;        // the retry count of the call
     private final Writable rpcRequest;    // Serialized Rpc request from client
     private final Connection connection;  // connection to client
     private long timestamp;               // time received when response is null
                                           // time served when response is not null
     private ByteBuffer rpcResponse;       // the response for this call
     private final RPC.RpcKind rpcKind;
+    private final byte[] clientId;
 
-    public Call(int id, Writable param, Connection connection) {
-      this( id,  param,  connection, RPC.RpcKind.RPC_BUILTIN );    
+    public Call(int id, int retryCount, Writable param, 
+        Connection connection) {
+      this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
+          RpcConstants.DUMMY_CLIENT_ID);
     }
-    public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) { 
+
+    public Call(int id, int retryCount, Writable param, Connection connection,
+        RPC.RpcKind kind, byte[] clientId) {
       this.callId = id;
+      this.retryCount = retryCount;
       this.rpcRequest = param;
       this.connection = connection;
       this.timestamp = Time.now();
       this.rpcResponse = null;
       this.rpcKind = kind;
+      this.clientId = clientId;
     }
     
     @Override
     public String toString() {
-      return rpcRequest.toString() + " from " + connection.toString();
+      return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+          + retryCount;
     }
 
     public void setResponse(ByteBuffer response) {
@@ -761,9 +794,13 @@ public abstract class Server {
         LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
         throw ieo;
       } catch (Exception e) {
-        LOG.info(getName() + ": readAndProcess threw exception " + e +
-            " from client " + c.getHostAddress() +
-            ". Count of bytes read: " + count, e);
+        // a WrappedRpcServerException is an exception that has been sent
+        // to the client, so the stacktrace is unnecessary; any other
+        // exceptions are unexpected internal server errors and thus the
+        // stacktrace should be logged
+        LOG.info(getName() + ": readAndProcess from client " +
+            c.getHostAddress() + " threw exception [" + e + "]",
+            (e instanceof WrappedRpcServerException) ? null : e);
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
@@ -882,11 +919,7 @@ public abstract class Server {
           }
           
           for(Call call : calls) {
-            try {
-              doPurge(call, now);
-            } catch (IOException e) {
-              LOG.warn("Error in purging old calls " + e);
-            }
+            doPurge(call, now);
           }
         } catch (OutOfMemoryError e) {
           //
@@ -931,7 +964,7 @@ public abstract class Server {
     // Remove calls that have been pending in the responseQueue 
     // for a long time.
     //
-    private void doPurge(Call call, long now) throws IOException {
+    private void doPurge(Call call, long now) {
       LinkedList<Call> responseQueue = call.connection.responseQueue;
       synchronized (responseQueue) {
         Iterator<Call> iter = responseQueue.listIterator(0);
@@ -970,8 +1003,7 @@ public abstract class Server {
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.callId + " from " +
-                      call.connection);
+            LOG.debug(getName() + ": responding to " + call);
           }
           //
           // Send as much data as we can in the non-blocking fashion
@@ -990,8 +1022,8 @@ public abstract class Server {
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
-                        call.connection + " Wrote " + numBytes + " bytes.");
+              LOG.debug(getName() + ": responding to " + call
+                  + " Wrote " + numBytes + " bytes.");
             }
           } else {
             //
@@ -1018,9 +1050,8 @@ public abstract class Server {
               }
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
-                        call.connection + " Wrote partial " + numBytes + 
-                        " bytes.");
+              LOG.debug(getName() + ": responding to " + call
+                  + " Wrote partial " + numBytes + " bytes.");
             }
           }
           error = false;              // everything went off well
@@ -1083,6 +1114,32 @@ public abstract class Server {
     }
   };
   
+  /**
+   * Wrapper for RPC IOExceptions to be returned to the client.  Used to
+   * let exceptions bubble up to top of processOneRpc where the correct
+   * callId can be associated with the response.  Also used to prevent
+   * unnecessary stack trace logging if it's not an internal server error. 
+   */
+  @SuppressWarnings("serial")
+  private static class WrappedRpcServerException extends RpcServerException {
+    private final RpcErrorCodeProto errCode;
+    public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
+      super(ioe.toString(), ioe);
+      this.errCode = errCode;
+    }
+    public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {
+      this(errCode, new RpcServerException(message));
+    }
+    @Override
+    public RpcErrorCodeProto getRpcErrorCodeProto() {
+      return errCode;
+    }
+    @Override
+    public String toString() {
+      return getCause().toString();
+    }
+  }
+
   /** Reads calls from a connection and queues them for handling. */
   public class Connection {
     private boolean connectionHeaderRead = false; // connection  header is read?
@@ -1109,7 +1166,6 @@ public abstract class Server {
     private AuthMethod authMethod;
     private AuthProtocol authProtocol;
     private boolean saslContextEstablished;
-    private boolean skipInitialSaslHandshake;
     private ByteBuffer connectionHeaderBuf = null;
     private ByteBuffer unwrappedData;
     private ByteBuffer unwrappedDataLengthBuffer;
@@ -1119,12 +1175,12 @@ public abstract class Server {
     public UserGroupInformation attemptingUser = null; // user name before auth
 
     // Fake 'call' for failed authorization response
-    private static final int AUTHORIZATION_FAILED_CALLID = -1;
-    private final Call authFailedCall = 
-      new Call(AUTHORIZATION_FAILED_CALLID, null, this);
+    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
+        RpcConstants.INVALID_RETRY_COUNT, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
     
-    private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this);
+    private final Call saslCall = new Call(AuthProtocol.SASL.callId,
+        RpcConstants.INVALID_RETRY_COUNT, null, this);
     private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
     
     private boolean sentNegotiate = false;
@@ -1200,7 +1256,7 @@ public abstract class Server {
     }
     
     private UserGroupInformation getAuthorizedUgi(String authorizedId)
-        throws IOException {
+        throws InvalidToken, AccessControlException {
       if (authMethod == AuthMethod.TOKEN) {
         TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
             secretManager);
@@ -1216,12 +1272,37 @@ public abstract class Server {
       }
     }
 
-    private void saslReadAndProcess(byte[] saslToken) throws IOException,
-        InterruptedException {
-      if (!saslContextEstablished) {
-        RpcSaslProto saslResponse;
+    private void saslReadAndProcess(DataInputStream dis) throws
+    WrappedRpcServerException, IOException, InterruptedException {
+      final RpcSaslProto saslMessage =
+          decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
+      switch (saslMessage.getState()) {
+        case WRAP: {
+          if (!saslContextEstablished || !useWrap) {
+            throw new WrappedRpcServerException(
+                RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+                new SaslException("Server is not wrapping data"));
+          }
+          // loops over decoded data and calls processOneRpc
+          unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
+          break;
+        }
+        default:
+          saslProcess(saslMessage);
+      }
+    }
+
+    private void saslProcess(RpcSaslProto saslMessage)
+        throws WrappedRpcServerException, IOException, InterruptedException {
+      if (saslContextEstablished) {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+            new SaslException("Negotiation is already complete"));
+      }
+      RpcSaslProto saslResponse = null;
+      try {
         try {
-          saslResponse = processSaslMessage(saslToken);
+          saslResponse = processSaslMessage(saslMessage);
         } catch (IOException e) {
           IOException sendToClient = e;
           Throwable cause = e;
@@ -1237,9 +1318,7 @@ public abstract class Server {
           // attempting user could be null
           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
             " (" + e.getLocalizedMessage() + ")");
-          // wait to send response until failure is logged
-          doSaslReply(sendToClient);
-          throw e;
+          throw sendToClient;
         }
         
         if (saslServer != null && saslServer.isComplete()) {
@@ -1247,8 +1326,6 @@ public abstract class Server {
             LOG.debug("SASL server context established. Negotiated QoP is "
                 + saslServer.getNegotiatedProperty(Sasl.QOP));
           }
-          String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
-          useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
           user = getAuthorizedUgi(saslServer.getAuthorizationID());
           if (LOG.isDebugEnabled()) {
             LOG.debug("SASL server successfully authenticated client: " + user);
@@ -1257,37 +1334,27 @@ public abstract class Server {
           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
           saslContextEstablished = true;
         }
-        // send reply here to avoid a successful auth being logged as a
-        // failure if response can't be sent
+      } catch (WrappedRpcServerException wrse) { // don't re-wrap
+        throw wrse;
+      } catch (IOException ioe) {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
+      }
+      // send back response if any, may throw IOException
+      if (saslResponse != null) {
         doSaslReply(saslResponse);
-      } else {
-        if (LOG.isDebugEnabled())
-          LOG.debug("Have read input token of size " + saslToken.length
-              + " for processing by saslServer.unwrap()");
-        
-        if (!useWrap) {
-          processOneRpc(saslToken);
-        } else {
-          byte[] plaintextData = saslServer.unwrap(saslToken, 0,
-              saslToken.length);
-          processUnwrappedData(plaintextData);
-        }
+      }
+      // do NOT enable wrapping until the last auth response is sent
+      if (saslContextEstablished) {
+        String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+        // SASL wrapping is only used if the connection has a QOP, and
+        // the value is not auth.  ex. auth-int & auth-priv
+        useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));        
       }
     }
     
-    private RpcSaslProto processSaslMessage(byte[] buf)
+    private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
         throws IOException, InterruptedException {
-      final DataInputStream dis =
-          new DataInputStream(new ByteArrayInputStream(buf));
-      RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper();
-      requestWrapper.readFields(dis);
-      
-      final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader;
-      if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) {
-        throw new SaslException("Client sent non-SASL request");
-      }      
-      final RpcSaslProto saslMessage =
-          RpcSaslProto.parseFrom(requestWrapper.theRequestRead);
       RpcSaslProto saslResponse = null;
       final SaslState state = saslMessage.getState(); // required      
       switch (state) {
@@ -1297,23 +1364,39 @@ public abstract class Server {
                 "Client already attempted negotiation");
           }
           saslResponse = buildSaslNegotiateResponse();
+          // simple-only server negotiate response is success which client
+          // interprets as switch to simple
+          if (saslResponse.getState() == SaslState.SUCCESS) {
+            switchToSimple();
+          }
           break;
         }
         case INITIATE: {
           if (saslMessage.getAuthsCount() != 1) {
             throw new SaslException("Client mechanism is malformed");
           }
-          String authMethodName = saslMessage.getAuths(0).getMethod();
-          authMethod = createSaslServer(authMethodName);
-          if (authMethod == null) { // the auth method is not supported
+          // verify the client requested an advertised authType
+          SaslAuth clientSaslAuth = saslMessage.getAuths(0);
+          if (!negotiateResponse.getAuthsList().contains(clientSaslAuth)) {
             if (sentNegotiate) {
               throw new AccessControlException(
-                  authMethodName + " authentication is not enabled."
+                  clientSaslAuth.getMethod() + " authentication is not enabled."
                       + "  Available:" + enabledAuthMethods);
             }
             saslResponse = buildSaslNegotiateResponse();
             break;
           }
+          authMethod = AuthMethod.valueOf(clientSaslAuth.getMethod());
+          // abort SASL for SIMPLE auth, server has already ensured that
+          // SIMPLE is a legit option above.  we will send no response
+          if (authMethod == AuthMethod.SIMPLE) {
+            switchToSimple();
+            break;
+          }
+          // sasl server for tokens may already be instantiated
+          if (saslServer == null || authMethod != AuthMethod.TOKEN) {
+            saslServer = createSaslServer(authMethod);
+          }
           // fallthru to process sasl token
         }
         case RESPONSE: {
@@ -1336,9 +1419,14 @@ public abstract class Server {
       }
       return saslResponse;
     }
+
+    private void switchToSimple() {
+      // disable SASL and blank out any SASL server
+      authProtocol = AuthProtocol.NONE;
+      saslServer = null;
+    }
     
-    private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken)
-        throws IOException {
+    private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Will send " + state + " token of size "
             + ((replyToken != null) ? replyToken.length : null)
@@ -1352,8 +1440,7 @@ public abstract class Server {
       return response.build();
     }
     
-    private void doSaslReply(Message message)
-        throws IOException {
+    private void doSaslReply(Message message) throws IOException {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending sasl message "+message);
       }
@@ -1394,7 +1481,8 @@ public abstract class Server {
       }
     }
 
-    public int readAndProcess() throws IOException, InterruptedException {
+    public int readAndProcess()
+        throws WrappedRpcServerException, IOException, InterruptedException {
       while (true) {
         /* Read at most one RPC. If the header is not read completely yet
          * then iterate until we read first RPC or until there is no data left.
@@ -1427,8 +1515,9 @@ public abstract class Server {
             setupHttpRequestOnIpcPortResponse();
             return -1;
           }
-        
-          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
+          
+          if (!RpcConstants.HEADER.equals(dataLengthBuffer)
+              || version != CURRENT_VERSION) {
             //Warning is ok since this is not supposed to happen.
             LOG.warn("Incorrect header or version mismatch from " + 
                      hostAddress + ":" + remotePort +
@@ -1450,11 +1539,6 @@ public abstract class Server {
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
-          if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) {
-            // covers the !useSasl too
-            dataLengthBuffer.clear();
-            return 0; // ping message
-          }
           checkDataLength(dataLength);
           data = ByteBuffer.allocate(dataLength);
         }
@@ -1465,16 +1549,7 @@ public abstract class Server {
           dataLengthBuffer.clear();
           data.flip();
           boolean isHeaderRead = connectionContextRead;
-          if (authProtocol == AuthProtocol.SASL) {
-            // switch to simple must ignore next negotiate or initiate
-            if (skipInitialSaslHandshake) {
-              authProtocol = AuthProtocol.NONE;
-            } else {
-              saslReadAndProcess(data.array());
-            }
-          } else {
-            processOneRpc(data.array());
-          }
+          processOneRpc(data.array());
           data = null;
           if (!isHeaderRead) {
             continue;
@@ -1485,7 +1560,7 @@ public abstract class Server {
     }
 
     private AuthProtocol initializeAuthContext(int authType)
-        throws IOException, InterruptedException {
+        throws IOException {
       AuthProtocol authProtocol = AuthProtocol.valueOf(authType);
       if (authProtocol == null) {
         IOException ioe = new IpcException("Unknown auth protocol:" + authType);
@@ -1505,14 +1580,7 @@ public abstract class Server {
           }
           break;
         }
-        case SASL: {
-          // switch to simple hack, but don't switch if other auths are
-          // supported, ex. tokens
-          if (isSimpleEnabled && enabledAuthMethods.size() == 1) {
-            skipInitialSaslHandshake = true;
-            doSaslReply(buildSaslResponse(SaslState.SUCCESS, null));
-          }
-          // else wait for a negotiate or initiate
+        default: {
           break;
         }
       }
@@ -1537,25 +1605,6 @@ public abstract class Server {
       return negotiateMessage;
     }
     
-    private AuthMethod createSaslServer(String authMethodName)
-        throws IOException, InterruptedException {
-      AuthMethod authMethod;
-      try {
-        authMethod = AuthMethod.valueOf(authMethodName);
-        if (!enabledAuthMethods.contains(authMethod)) {
-          authMethod = null;
-        }
-      } catch (IllegalArgumentException iae) {
-        authMethod = null;
-      }
-      if (authMethod != null &&
-          // sasl server for tokens may already be instantiated
-          (saslServer == null || authMethod != AuthMethod.TOKEN)) {
-        saslServer = createSaslServer(authMethod);
-      }
-      return authMethod;
-    }
-
     private SaslServer createSaslServer(AuthMethod authMethod)
         throws IOException, InterruptedException {
       return new SaslRpcServer(authMethod).create(this, secretManager);
@@ -1576,20 +1625,23 @@ public abstract class Server {
       
       if (clientVersion >= 9) {
         // Versions >>9  understand the normal response
-        Call fakeCall =  new Call(-1, null, this);
+        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+            this);
         setupResponse(buffer, fakeCall, 
             RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
             null, VersionMismatch.class.getName(), errMsg);
         responder.doRespond(fakeCall);
       } else if (clientVersion >= 3) {
-        Call fakeCall =  new Call(-1, null, this);
+        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+            this);
         // Versions 3 to 8 use older response
         setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
         responder.doRespond(fakeCall);
       } else if (clientVersion == 2) { // Hadoop 0.18.3
-        Call fakeCall =  new Call(0, null, this);
+        Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
+            this);
         DataOutputStream out = new DataOutputStream(buffer);
         out.writeInt(0); // call ID
         out.writeBoolean(true); // error
@@ -1602,17 +1654,27 @@ public abstract class Server {
     }
     
     private void setupHttpRequestOnIpcPortResponse() throws IOException {
-      Call fakeCall =  new Call(0, null, this);
+      Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
       fakeCall.setResponse(ByteBuffer.wrap(
           RECEIVED_HTTP_REQ_RESPONSE.getBytes()));
       responder.doRespond(fakeCall);
     }
 
-    /** Reads the connection context following the connection header */
-    private void processConnectionContext(byte[] buf) throws IOException {
-      DataInputStream in =
-        new DataInputStream(new ByteArrayInputStream(buf));
-      connectionContext = IpcConnectionContextProto.parseFrom(in);
+    /** Reads the connection context following the connection header
+     * @param dis - DataInputStream from which to read the header 
+     * @throws WrappedRpcServerException - if the header cannot be
+     *         deserialized, or the user is not authorized
+     */ 
+    private void processConnectionContext(DataInputStream dis)
+        throws WrappedRpcServerException {
+      // allow only one connection context during a session
+      if (connectionContextRead) {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+            "Connection context already processed");
+      }
+      connectionContext = decodeProtobufFromStream(
+          IpcConnectionContextProto.newBuilder(), dis);
       protocolName = connectionContext.hasProtocol() ? connectionContext
           .getProtocol() : null;
 
@@ -1629,9 +1691,11 @@ public abstract class Server {
             && (!protocolUser.getUserName().equals(user.getUserName()))) {
           if (authMethod == AuthMethod.TOKEN) {
             // Not allowed to doAs if token authentication is used
-            throw new AccessControlException("Authenticated user (" + user
-                + ") doesn't match what the client claims to be ("
-                + protocolUser + ")");
+            throw new WrappedRpcServerException(
+                RpcErrorCodeProto.FATAL_UNAUTHORIZED,
+                new AccessControlException("Authenticated user (" + user
+                    + ") doesn't match what the client claims to be ("
+                    + protocolUser + ")"));
           } else {
             // Effective user can be different from authenticated user
             // for simple auth or kerberos auth
@@ -1642,10 +1706,25 @@ public abstract class Server {
           }
         }
       }
+      authorizeConnection();
+      // don't set until after authz because connection isn't established
+      connectionContextRead = true;
     }
     
-    private void processUnwrappedData(byte[] inBuf) throws IOException,
-        InterruptedException {
+    /**
+     * Process a wrapped RPC Request - unwrap the SASL packet and process
+     * each embedded RPC request 
+     * @param buf - SASL wrapped request of one or more RPCs
+     * @throws IOException - SASL packet cannot be unwrapped
+     * @throws InterruptedException
+     */    
+    private void unwrapPacketAndProcessRpcs(byte[] inBuf)
+        throws WrappedRpcServerException, IOException, InterruptedException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Have read input token of size " + inBuf.length
+            + " for processing by saslServer.unwrap()");
+      }
+      inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
           inBuf));
       // Read all RPCs contained in the inBuf, even partial ones
@@ -1660,13 +1739,6 @@ public abstract class Server {
         if (unwrappedData == null) {
           unwrappedDataLengthBuffer.flip();
           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
-          if (unwrappedDataLength == Client.PING_CALL_ID) {
-            if (LOG.isDebugEnabled())
-              LOG.debug("Received ping message");
-            unwrappedDataLengthBuffer.clear();
-            continue; // ping message
-          }
           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
         }
 
@@ -1683,61 +1755,95 @@ public abstract class Server {
       }
     }
     
-    private void processOneRpc(byte[] buf) throws IOException,
-        InterruptedException {
-      if (connectionContextRead) {
-        processRpcRequest(buf);
-      } else {
-        processConnectionContext(buf);
-        connectionContextRead = true;
-        if (!authorizeConnection()) {
-          throw new AccessControlException("Connection from " + this
-              + " for protocol " + connectionContext.getProtocol()
-              + " is unauthorized for user " + user);      
+    /**
+     * Process an RPC Request - handle connection setup and decoding of
+     * request into a Call
+     * @param buf - contains the RPC request header and the rpc request
+     * @throws IOException - internal error that should not be returned to
+     *         client, typically failure to respond to client
+     * @throws WrappedRpcServerException - an exception to be sent back to
+     *         the client that does not require verbose logging by the
+     *         Listener thread
+     * @throws InterruptedException
+     */    
+    private void processOneRpc(byte[] buf)
+        throws IOException, WrappedRpcServerException, InterruptedException {
+      int callId = -1;
+      int retry = RpcConstants.INVALID_RETRY_COUNT;
+      try {
+        final DataInputStream dis =
+            new DataInputStream(new ByteArrayInputStream(buf));
+        final RpcRequestHeaderProto header =
+            decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
+        callId = header.getCallId();
+        retry = header.getRetryCount();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" got #" + callId);
         }
+        checkRpcHeaders(header);
+        
+        if (callId < 0) { // callIds typically used during connection setup
+          processRpcOutOfBandRequest(header, dis);
+        } else if (!connectionContextRead) {
+          throw new WrappedRpcServerException(
+              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+              "Connection context not established");
+        } else {
+          processRpcRequest(header, dis);
+        }
+      } catch (WrappedRpcServerException wrse) { // inform client of error
+        Throwable ioe = wrse.getCause();
+        final Call call = new Call(callId, retry, null, this);
+        setupResponse(authFailedResponse, call,
+            RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
+            ioe.getClass().getName(), ioe.getMessage());
+        responder.doRespond(call);
+        throw wrse;
       }
     }
-    
+
     /**
-     * Process an RPC Request - the connection headers and context have been
-     * read
-     * @param buf - contains the RPC request header and the rpc request
-     * @throws RpcServerException due to fatal rpc layer issues such as
-     *   invalid header. In this case a RPC fatal status response is sent back
-     *   to client.
+     * Verify RPC header is valid
+     * @param header - RPC request header
+     * @throws WrappedRpcServerException - header contains invalid values 
      */
-    
-    private void processRpcRequest(byte[] buf) 
-        throws  RpcServerException, IOException, InterruptedException {
-      DataInputStream dis =
-        new DataInputStream(new ByteArrayInputStream(buf));
-      RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
-        
-      if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + header.getCallId());
+    private void checkRpcHeaders(RpcRequestHeaderProto header)
+        throws WrappedRpcServerException {
       if (!header.hasRpcOp()) {
         String err = " IPC Server: No rpc op in rpcRequestHeader";
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
       }
       if (header.getRpcOp() != 
           RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
         String err = "IPC Server does not implement rpc header operation" + 
                 header.getRpcOp();
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
       }
       // If we know the rpc kind, get its class so that we can deserialize
       // (Note it would make more sense to have the handler deserialize but 
       // we continue with this original design.
       if (!header.hasRpcKind()) {
         String err = " IPC Server: No rpc kind in rpcRequestHeader";
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
       }
+    }
+
+    /**
+     * Process an RPC Request - the connection headers and context must
+     * have been already read
+     * @param header - RPC request header
+     * @param dis - stream to request payload
+     * @throws WrappedRpcServerException - due to fatal rpc layer issues such
+     *   as invalid header or deserialization error. In this case a RPC fatal
+     *   status response will later be sent back to client.
+     * @throws InterruptedException
+     */
+    private void processRpcRequest(RpcRequestHeaderProto header,
+        DataInputStream dis) throws WrappedRpcServerException,
+        InterruptedException {
       Class<? extends Writable> rpcRequestClass = 
           getRpcRequestWrapper(header.getRpcKind());
       if (rpcRequestClass == null) {
@@ -1745,9 +1851,8 @@ public abstract class Server {
             " from client " + getHostAddress());
         final String err = "Unknown rpc kind in rpc header"  + 
             header.getRpcKind();
-        respondBadRpcHeader(new Call(header.getCallId(), null, this),
-            RpcServerException.class.getName(), err);
-        throw new RpcServerException(err);   
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);   
       }
       Writable rpcRequest;
       try { //Read the rpc request
@@ -1757,28 +1862,67 @@ public abstract class Server {
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress() + "on connection protocol " +
             this.protocolName + " for rpcKind " + header.getRpcKind(),  t);
-        final Call readParamsFailedCall = 
-            new Call(header.getCallId(), null, this);
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         String err = "IPC server unable to read call parameters: "+ t.getMessage();
-
-        setupResponse(responseBuffer, readParamsFailedCall, 
-            RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
-            null, t.getClass().getName(),
-            err);
-        responder.doRespond(readParamsFailedCall);
-        throw new RpcServerException(err, t);
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
       }
         
-      Call call = new Call(header.getCallId(), rpcRequest, this, 
-          ProtoUtil.convert(header.getRpcKind()));
+      Call call = new Call(header.getCallId(), header.getRetryCount(),
+          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
+              .getClientId().toByteArray());
       callQueue.put(call);              // queue the call; maybe blocked here
       incRpcCount();  // Increment the rpc count
     }
 
-    private boolean authorizeConnection() throws IOException {
+
+    /**
+     * Establish RPC connection setup by negotiating SASL if required, then
+     * reading and authorizing the connection header
+     * @param header - RPC header
+     * @param dis - stream to request payload
+     * @throws WrappedRpcServerException - setup failed due to SASL
+     *         negotiation failure, premature or invalid connection context,
+     *         or other state errors 
+     * @throws IOException - failed to send a response back to the client
+     * @throws InterruptedException
+     */
+    private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
+        DataInputStream dis) throws WrappedRpcServerException, IOException,
+        InterruptedException {
+      final int callId = header.getCallId();
+      if (callId == CONNECTION_CONTEXT_CALL_ID) {
+        // SASL must be established prior to connection context
+        if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {
+          throw new WrappedRpcServerException(
+              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+              "Connection header sent during SASL negotiation");
+        }
+        // read and authorize the user
+        processConnectionContext(dis);
+      } else if (callId == AuthProtocol.SASL.callId) {
+        // if client was switched to simple, ignore first SASL message
+        if (authProtocol != AuthProtocol.SASL) {
+          throw new WrappedRpcServerException(
+              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+              "SASL protocol not requested by client");
+        }
+        saslReadAndProcess(dis);
+      } else if (callId == PING_CALL_ID) {
+        LOG.debug("Received ping message");
+      } else {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+            "Unknown out of band call #" + callId);
+      }
+    }    
+
+    /**
+     * Authorize proxy users to access this server
+     * @throws WrappedRpcServerException - user is not allowed to proxy
+     */
+    private void authorizeConnection() throws WrappedRpcServerException {
       try {
-        // If auth method is DIGEST, the token was obtained by the
+        // If auth method is TOKEN, the token was obtained by the
         // real user for the effective user, therefore not required to
         // authorize real user. doAs is allowed only for simple or kerberos
         // authentication
@@ -1792,17 +1936,37 @@ public abstract class Server {
         }
         rpcMetrics.incrAuthorizationSuccesses();
       } catch (AuthorizationException ae) {
+        LOG.info("Connection from " + this
+            + " for protocol " + connectionContext.getProtocol()
+            + " is unauthorized for user " + user);
         rpcMetrics.incrAuthorizationFailures();
-        setupResponse(authFailedResponse, authFailedCall, 
-            RpcStatusProto.FATAL,  RpcErrorCodeProto.FATAL_UNAUTHORIZED, null,
-            ae.getClass().getName(), ae.getMessage());
-        responder.doRespond(authFailedCall);
-        return false;
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
       }
-      return true;
     }
     
     /**
+     * Decode the a protobuf from the given input stream 
+     * @param builder - Builder of the protobuf to decode
+     * @param dis - DataInputStream to read the protobuf
+     * @return Message - decoded protobuf
+     * @throws WrappedRpcServerException - deserialization failed
+     */
+    @SuppressWarnings("unchecked")
+    private <T extends Message> T decodeProtobufFromStream(Builder builder,
+        DataInputStream dis) throws WrappedRpcServerException {
+      try {
+        builder.mergeDelimitedFrom(dis);
+        return (T)builder.build();
+      } catch (Exception ioe) {
+        Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+            "Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
+      }
+    }
+
+    /**
      * Get service class for connection
      * @return the serviceClass
      */
@@ -1818,7 +1982,7 @@ public abstract class Server {
       this.serviceClass = serviceClass;
     }
 
-    private synchronized void close() throws IOException {
+    private synchronized void close() {
       disposeSasl();
       data = null;
       dataLengthBuffer = null;
@@ -1851,8 +2015,7 @@ public abstract class Server {
         try {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": has Call#" + call.callId + 
-                "for RpcKind " + call.rpcKind + " from " + call.connection);
+            LOG.debug(getName() + ": " + call + " for RpcKind " + call.rpcKind);
           }
           String errorClass = null;
           String error = null;
@@ -2050,17 +2213,23 @@ public abstract class Server {
   private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
       throws IOException {
     RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
-    negotiateBuilder.setState(SaslState.NEGOTIATE);
-    for (AuthMethod authMethod : authMethods) {
-      if (authMethod == AuthMethod.SIMPLE) { // not a SASL method
-        continue;
-      }
-      SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);      
-      negotiateBuilder.addAuthsBuilder()
-          .setMethod(authMethod.toString())
-          .setMechanism(saslRpcServer.mechanism)
-          .setProtocol(saslRpcServer.protocol)
-          .setServerId(saslRpcServer.serverId);
+    if (authMethods.contains(AuthMethod.SIMPLE) && authMethods.size() == 1) {
+      // SIMPLE-only servers return success in response to negotiate
+      negotiateBuilder.setState(SaslState.SUCCESS);
+    } else {
+      negotiateBuilder.setState(SaslState.NEGOTIATE);
+      for (AuthMethod authMethod : authMethods) {
+        SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);      
+        SaslAuth.Builder builder = negotiateBuilder.addAuthsBuilder()
+            .setMethod(authMethod.toString())
+            .setMechanism(saslRpcServer.mechanism);
+        if (saslRpcServer.protocol != null) {
+          builder.setProtocol(saslRpcServer.protocol);
+        }
+        if (saslRpcServer.serverId != null) {
+          builder.setServerId(saslRpcServer.serverId);
+        }
+      }
     }
     return negotiateBuilder.build();
   }
@@ -2095,10 +2264,7 @@ public abstract class Server {
       if (connectionList.remove(connection))
         numConnections--;
     }
-    try {
-      connection.close();
-    } catch (IOException e) {
-    }
+    connection.close();
   }
   
   /**
@@ -2120,9 +2286,11 @@ public abstract class Server {
     DataOutputStream out = new DataOutputStream(responseBuf);
     RpcResponseHeaderProto.Builder headerBuilder =  
         RpcResponseHeaderProto.newBuilder();
+    headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
     headerBuilder.setCallId(call.callId);
+    headerBuilder.setRetryCount(call.retryCount);
     headerBuilder.setStatus(status);
-    headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
+    headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
 
     if (status == RpcStatusProto.SUCCESS) {
       RpcResponseHeaderProto header = headerBuilder.build();
@@ -2206,18 +2374,6 @@ public abstract class Server {
   }
   
   
-  private void respondBadRpcHeader(Call call, String errorClass, String error)
-      throws IOException
-  {
-    ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
-    setupResponse(responseBuf, call, 
-        RpcStatusProto.FATAL,  RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
-        null, errorClass, error);
-    responder.doRespond(call);
-    return; 
-    
-  }
-  
   private void wrapWithSasl(ByteArrayOutputStream response, Call call)
       throws IOException {
     if (call.connection.saslServer != null) {
@@ -2231,9 +2387,21 @@ public abstract class Server {
         LOG.debug("Adding saslServer wrapped token of size " + token.length
             + " as call response.");
       response.reset();
-      DataOutputStream saslOut = new DataOutputStream(response);
-      saslOut.writeInt(token.length);
-      saslOut.write(token, 0, token.length);
+      // rebuild with sasl header and payload
+      RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
+          .setCallId(AuthProtocol.SASL.callId)
+          .setStatus(RpcStatusProto.SUCCESS)
+          .build();
+      RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+          .setState(SaslState.WRAP)
+          .setToken(ByteString.copyFrom(token, 0, token.length))
+          .build();
+      RpcResponseMessageWrapper saslResponse =
+          new RpcResponseMessageWrapper(saslHeader, saslMessage);
+
+      DataOutputStream out = new DataOutputStream(response);
+      out.writeInt(saslResponse.getLength());
+      saslResponse.write(out);
     }
   }
   

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/file/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java Mon Aug 12 21:25:49 2013
@@ -129,7 +129,7 @@ public abstract class MetricsDynamicMBea
   @Override
   public Object getAttribute(String attributeName) throws AttributeNotFoundException,
       MBeanException, ReflectionException {
-    if (attributeName == null || attributeName.equals("")) 
+    if (attributeName == null || attributeName.isEmpty()) 
       throw new IllegalArgumentException();
     
     updateMbeanInfoIfMetricsListChanged();
@@ -197,7 +197,7 @@ public abstract class MetricsDynamicMBea
   public Object invoke(String actionName, Object[] parms, String[] signature)
       throws MBeanException, ReflectionException {
     
-    if (actionName == null || actionName.equals("")) 
+    if (actionName == null || actionName.isEmpty()) 
       throw new IllegalArgumentException();
     
     

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Mon Aug 12 21:25:49 2013
@@ -384,7 +384,7 @@ public class MetricsSystemImpl extends M
   private void snapshotMetrics(MetricsSourceAdapter sa,
                                MetricsBufferBuilder bufferBuilder) {
     long startTime = Time.now();
-    bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
+    bufferBuilder.add(sa.name(), sa.getMetrics(collector, true));
     collector.clear();
     snapshotStat.add(Time.now() - startTime);
     LOG.debug("Snapshotted source "+ sa.name());

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java Mon Aug 12 21:25:49 2013
@@ -144,7 +144,7 @@ public class CsvRecordInput implements R
     
   @Override
   public void startRecord(String tag) throws IOException {
-    if (tag != null && !"".equals(tag)) {
+    if (tag != null && !tag.isEmpty()) {
       char c1 = (char) stream.read();
       char c2 = (char) stream.read();
       if (c1 != 's' || c2 != '{') {
@@ -156,7 +156,7 @@ public class CsvRecordInput implements R
   @Override
   public void endRecord(String tag) throws IOException {
     char c = (char) stream.read();
-    if (tag == null || "".equals(tag)) {
+    if (tag == null || tag.isEmpty()) {
       if (c != '\n' && c != '\r') {
         throw new IOException("Error deserializing record.");
       } else {

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java Mon Aug 12 21:25:49 2013
@@ -115,7 +115,7 @@ public class CsvRecordOutput implements 
     
   @Override
   public void startRecord(Record r, String tag) throws IOException {
-    if (tag != null && !"".equals(tag)) {
+    if (tag != null && ! tag.isEmpty()) {
       printCommaUnlessFirst();
       stream.print("s{");
       isFirst = true;
@@ -124,7 +124,7 @@ public class CsvRecordOutput implements 
     
   @Override
   public void endRecord(Record r, String tag) throws IOException {
-    if (tag == null || "".equals(tag)) {
+    if (tag == null || tag.isEmpty()) {
       stream.print("\n");
       isFirst = true;
     } else {

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/generated/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java Mon Aug 12 21:25:49 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.KerberosInfo;
 
 /**
@@ -43,12 +44,13 @@ public interface RefreshUserMappingsProt
    * Refresh user to group mappings.
    * @throws IOException
    */
+  @Idempotent
   public void refreshUserToGroupsMappings() throws IOException;
   
   /**
    * Refresh superuser proxy group list
    * @throws IOException
    */
-  public void refreshSuperUserGroupsConfiguration() 
-  throws IOException;
+  @Idempotent
+  public void refreshSuperUserGroupsConfiguration() throws IOException;
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Mon Aug 12 21:25:49 2013
@@ -20,18 +20,27 @@ package org.apache.hadoop.security;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.sasl.RealmCallback;
 import javax.security.sasl.RealmChoiceCallback;
 import javax.security.sasl.Sasl;
@@ -42,10 +51,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GlobPattern;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.ipc.Server.AuthProtocol;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
@@ -57,8 +69,11 @@ import org.apache.hadoop.security.SaslRp
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ProtoUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 /**
  * A utility class that encapsulates SASL logic for RPC client
@@ -68,54 +83,155 @@ import com.google.protobuf.ByteString;
 public class SaslRpcClient {
   public static final Log LOG = LogFactory.getLog(SaslRpcClient.class);
 
-  private final AuthMethod authMethod;
-  private final SaslClient saslClient;
-  private final boolean fallbackAllowed;
-  private static final RpcRequestHeaderProto saslHeader =
-      ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
-          OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId);
+  private final UserGroupInformation ugi;
+  private final Class<?> protocol;
+  private final InetSocketAddress serverAddr;  
+  private final Configuration conf;
+
+  private SaslClient saslClient;
+  private AuthMethod authMethod;
+  
+  private static final RpcRequestHeaderProto saslHeader = ProtoUtil
+      .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+          OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
+          RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
   private static final RpcSaslProto negotiateRequest =
       RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
   
   /**
-   * Create a SaslRpcClient for an authentication method
+   * Create a SaslRpcClient that can be used by a RPC client to negotiate
+   * SASL authentication with a RPC server
+   * @param ugi - connecting user
+   * @param protocol - RPC protocol
+   * @param serverAddr - InetSocketAddress of remote server
+   * @param conf - Configuration
+   */
+  public SaslRpcClient(UserGroupInformation ugi, Class<?> protocol,
+      InetSocketAddress serverAddr, Configuration conf) {
+    this.ugi = ugi;
+    this.protocol = protocol;
+    this.serverAddr = serverAddr;
+    this.conf = conf;
+  }
+  
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public Object getNegotiatedProperty(String key) {
+    return (saslClient != null) ? saslClient.getNegotiatedProperty(key) : null;
+  }
+  
+
+  // the RPC Client has an inelegant way of handling expiration of TGTs
+  // acquired via a keytab.  any connection failure causes a relogin, so
+  // the Client needs to know what authMethod was being attempted if an
+  // exception occurs.  the SASL prep for a kerberos connection should
+  // ideally relogin if necessary instead of exposing this detail to the
+  // Client
+  @InterfaceAudience.Private
+  public AuthMethod getAuthMethod() {
+    return authMethod;
+  }
+  
+  /**
+   * Instantiate a sasl client for the first supported auth type in the
+   * given list.  The auth type must be defined, enabled, and the user
+   * must possess the required credentials, else the next auth is tried.
    * 
-   * @param method
-   *          the requested authentication method
-   * @param token
-   *          token to use if needed by the authentication method
+   * @param authTypes to attempt in the given order
+   * @return SaslAuth of instantiated client
+   * @throws AccessControlException - client doesn't support any of the auths
+   * @throws IOException - misc errors
    */
-  public SaslRpcClient(AuthMethod method,
-      Token<? extends TokenIdentifier> token, String serverPrincipal,
-      boolean fallbackAllowed)
-      throws IOException {
-    this.authMethod = method;
-    this.fallbackAllowed = fallbackAllowed;
+  private SaslAuth selectSaslClient(List<SaslAuth> authTypes)
+      throws SaslException, AccessControlException, IOException {
+    SaslAuth selectedAuthType = null;
+    boolean switchToSimple = false;
+    for (SaslAuth authType : authTypes) {
+      if (!isValidAuthType(authType)) {
+        continue; // don't know what it is, try next
+      }
+      AuthMethod authMethod = AuthMethod.valueOf(authType.getMethod());
+      if (authMethod == AuthMethod.SIMPLE) {
+        switchToSimple = true;
+      } else {
+        saslClient = createSaslClient(authType);
+        if (saslClient == null) { // client lacks credentials, try next
+          continue;
+        }
+      }
+      selectedAuthType = authType;
+      break;
+    }
+    if (saslClient == null && !switchToSimple) {
+      List<String> serverAuthMethods = new ArrayList<String>();
+      for (SaslAuth authType : authTypes) {
+        serverAuthMethods.add(authType.getMethod());
+      }
+      throw new AccessControlException(
+          "Client cannot authenticate via:" + serverAuthMethods);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Use " + selectedAuthType.getMethod() +
+          " authentication for protocol " + protocol.getSimpleName());
+    }
+    return selectedAuthType;
+  }
+  
+
+  private boolean isValidAuthType(SaslAuth authType) {
+    AuthMethod authMethod;
+    try {
+      authMethod = AuthMethod.valueOf(authType.getMethod());
+    } catch (IllegalArgumentException iae) { // unknown auth
+      authMethod = null;
+    }
+    // do we know what it is?  is it using our mechanism?
+    return authMethod != null &&
+           authMethod.getMechanismName().equals(authType.getMechanism());
+  }  
+  
+  /**
+   * Try to create a SaslClient for an authentication type.  May return
+   * null if the type isn't supported or the client lacks the required
+   * credentials.
+   * 
+   * @param authType - the requested authentication method
+   * @return SaslClient for the authType or null
+   * @throws SaslException - error instantiating client
+   * @throws IOException - misc errors
+   */
+  private SaslClient createSaslClient(SaslAuth authType)
+      throws SaslException, IOException {
     String saslUser = null;
-    String saslProtocol = null;
-    String saslServerName = null;
+    // SASL requires the client and server to use the same proto and serverId
+    // if necessary, auth types below will verify they are valid
+    final String saslProtocol = authType.getProtocol();
+    final String saslServerName = authType.getServerId();
     Map<String, String> saslProperties = SaslRpcServer.SASL_PROPS;
     CallbackHandler saslCallback = null;
     
+    final AuthMethod method = AuthMethod.valueOf(authType.getMethod());
     switch (method) {
       case TOKEN: {
-        saslProtocol = "";
-        saslServerName = SaslRpcServer.SASL_DEFAULT_REALM;
+        Token<?> token = getServerToken(authType);
+        if (token == null) {
+          return null; // tokens aren't supported or user doesn't have one
+        }
         saslCallback = new SaslClientCallbackHandler(token);
         break;
       }
       case KERBEROS: {
-        if (serverPrincipal == null || serverPrincipal.isEmpty()) {
-          throw new IOException(
-              "Failed to specify server's Kerberos principal name");
-        }
-        KerberosName name = new KerberosName(serverPrincipal);
-        saslProtocol = name.getServiceName();
-        saslServerName = name.getHostName();
-        if (saslServerName == null) {
-          throw new IOException(
-              "Kerberos principal name does NOT have the expected hostname part: "
-                  + serverPrincipal);
+        if (ugi.getRealAuthenticationMethod().getAuthMethod() !=
+            AuthMethod.KERBEROS) {
+          return null; // client isn't using kerberos
+        }
+        String serverPrincipal = getServerPrincipal(authType);
+        if (serverPrincipal == null) {
+          return null; // protocol doesn't use kerberos
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("RPC Server's Kerberos principal name for protocol="
+              + protocol.getCanonicalName() + " is " + serverPrincipal);
         }
         break;
       }
@@ -125,16 +241,93 @@ public class SaslRpcClient {
     
     String mechanism = method.getMechanismName();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating SASL " + mechanism + "(" + authMethod + ") "
+      LOG.debug("Creating SASL " + mechanism + "(" + method + ") "
           + " client to authenticate to service at " + saslServerName);
     }
-    saslClient = Sasl.createSaslClient(
+    return Sasl.createSaslClient(
         new String[] { mechanism }, saslUser, saslProtocol, saslServerName,
         saslProperties, saslCallback);
-    if (saslClient == null) {
-      throw new IOException("Unable to find SASL client implementation");
+  }
+  
+  /**
+   * Try to locate the required token for the server.
+   * 
+   * @param authType of the SASL client
+   * @return Token<?> for server, or null if no token available
+   * @throws IOException - token selector cannot be instantiated
+   */
+  private Token<?> getServerToken(SaslAuth authType) throws IOException {
+    TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
+    LOG.debug("Get token info proto:"+protocol+" info:"+tokenInfo);
+    if (tokenInfo == null) { // protocol has no support for tokens
+      return null;
+    }
+    TokenSelector<?> tokenSelector = null;
+    try {
+      tokenSelector = tokenInfo.value().newInstance();
+    } catch (InstantiationException e) {
+      throw new IOException(e.toString());
+    } catch (IllegalAccessException e) {
+      throw new IOException(e.toString());
+    }
+    return tokenSelector.selectToken(
+        SecurityUtil.buildTokenService(serverAddr), ugi.getTokens());
+  }
+  
+  /**
+   * Get the remote server's principal.  The value will be obtained from
+   * the config and cross-checked against the server's advertised principal.
+   * 
+   * @param authType of the SASL client
+   * @return String of the server's principal
+   * @throws IOException - error determining configured principal
+   */
+  @VisibleForTesting
+  String getServerPrincipal(SaslAuth authType) throws IOException {
+    KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
+    LOG.debug("Get kerberos info proto:"+protocol+" info:"+krbInfo);
+    if (krbInfo == null) { // protocol has no support for kerberos
+      return null;
+    }
+    String serverKey = krbInfo.serverPrincipal();
+    if (serverKey == null) {
+      throw new IllegalArgumentException(
+          "Can't obtain server Kerberos config key from protocol="
+              + protocol.getCanonicalName());
+    }
+    // construct server advertised principal for comparision
+    String serverPrincipal = new KerberosPrincipal(
+        authType.getProtocol() + "/" + authType.getServerId()).getName();
+    boolean isPrincipalValid = false;
+
+    // use the pattern if defined
+    String serverKeyPattern = conf.get(serverKey + ".pattern");
+    if (serverKeyPattern != null && !serverKeyPattern.isEmpty()) {
+      Pattern pattern = GlobPattern.compile(serverKeyPattern);
+      isPrincipalValid = pattern.matcher(serverPrincipal).matches();
+    } else {
+      // check that the server advertised principal matches our conf
+      String confPrincipal = SecurityUtil.getServerPrincipal(
+          conf.get(serverKey), serverAddr.getAddress());
+      if (confPrincipal == null || confPrincipal.isEmpty()) {
+        throw new IllegalArgumentException(
+            "Failed to specify server's Kerberos principal name");
+      }
+      KerberosName name = new KerberosName(confPrincipal);
+      if (name.getHostName() == null) {
+        throw new IllegalArgumentException(
+            "Kerberos principal name does NOT have the expected hostname part: "
+                + confPrincipal);
+      }
+      isPrincipalValid = serverPrincipal.equals(confPrincipal);
+    }
+    if (!isPrincipalValid) {
+      throw new IllegalArgumentException(
+          "Server has invalid Kerberos principal: " + serverPrincipal);
     }
+    return serverPrincipal;
   }
+  
 
   /**
    * Do client side SASL authentication with server via the given InputStream
@@ -144,18 +337,19 @@ public class SaslRpcClient {
    *          InputStream to use
    * @param outS
    *          OutputStream to use
-   * @return true if connection is set up, or false if needs to switch 
-   *             to simple Auth.
+   * @return AuthMethod used to negotiate the connection
    * @throws IOException
    */
-  public boolean saslConnect(InputStream inS, OutputStream outS)
+  public AuthMethod saslConnect(InputStream inS, OutputStream outS)
       throws IOException {
     DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
     DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
         outS));
     
-    // track if SASL ever started, or server switched us to simple
-    boolean inSasl = false;
+    // redefined if/when a SASL negotiation starts, can be queried if the
+    // negotiation fails
+    authMethod = AuthMethod.SIMPLE;
+    
     sendSaslMessage(outStream, negotiateRequest);
     
     // loop until sasl is complete or a rpc error occurs
@@ -189,50 +383,49 @@ public class SaslRpcClient {
       RpcSaslProto.Builder response = null;
       switch (saslMessage.getState()) {
         case NEGOTIATE: {
-          inSasl = true;
-          // TODO: should instantiate sasl client based on advertisement
-          // but just blindly use the pre-instantiated sasl client for now
-          String clientAuthMethod = authMethod.toString();
-          SaslAuth saslAuthType = null;
-          for (SaslAuth authType : saslMessage.getAuthsList()) {
-            if (clientAuthMethod.equals(authType.getMethod())) {
-              saslAuthType = authType;
-              break;
-            }
-          }
-          if (saslAuthType == null) {
-            saslAuthType = SaslAuth.newBuilder()
-                .setMethod(clientAuthMethod)
-                .setMechanism(saslClient.getMechanismName())
-                .build();
-          }
+          // create a compatible SASL client, throws if no supported auths
+          SaslAuth saslAuthType = selectSaslClient(saslMessage.getAuthsList());
+          // define auth being attempted, caller can query if connect fails
+          authMethod = AuthMethod.valueOf(saslAuthType.getMethod());
           
-          byte[] challengeToken = null;
-          if (saslAuthType != null && saslAuthType.hasChallenge()) {
-            // server provided the first challenge
-            challengeToken = saslAuthType.getChallenge().toByteArray();
-            saslAuthType =
-              SaslAuth.newBuilder(saslAuthType).clearChallenge().build();
-          } else if (saslClient.hasInitialResponse()) {
-            challengeToken = new byte[0];
+          byte[] responseToken = null;
+          if (authMethod == AuthMethod.SIMPLE) { // switching to SIMPLE
+            done = true; // not going to wait for success ack
+          } else {
+            byte[] challengeToken = null;
+            if (saslAuthType.hasChallenge()) {
+              // server provided the first challenge
+              challengeToken = saslAuthType.getChallenge().toByteArray();
+              saslAuthType =
+                  SaslAuth.newBuilder(saslAuthType).clearChallenge().build();
+            } else if (saslClient.hasInitialResponse()) {
+              challengeToken = new byte[0];
+            }
+            responseToken = (challengeToken != null)
+                ? saslClient.evaluateChallenge(challengeToken)
+                    : new byte[0];
           }
-          byte[] responseToken = (challengeToken != null)
-              ? saslClient.evaluateChallenge(challengeToken)
-              : new byte[0];
-          
           response = createSaslReply(SaslState.INITIATE, responseToken);
           response.addAuths(saslAuthType);
           break;
         }
         case CHALLENGE: {
-          inSasl = true;
+          if (saslClient == null) {
+            // should probably instantiate a client to allow a server to
+            // demand a specific negotiation
+            throw new SaslException("Server sent unsolicited challenge");
+          }
           byte[] responseToken = saslEvaluateToken(saslMessage, false);
           response = createSaslReply(SaslState.RESPONSE, responseToken);
           break;
         }
         case SUCCESS: {
-          if (inSasl && saslEvaluateToken(saslMessage, true) != null) {
-            throw new SaslException("SASL client generated spurious token");
+          // simple server sends immediate success to a SASL client for
+          // switch to simple
+          if (saslClient == null) {
+            authMethod = AuthMethod.SIMPLE;
+          } else {
+            saslEvaluateToken(saslMessage, true);
           }
           done = true;
           break;
@@ -246,12 +439,7 @@ public class SaslRpcClient {
         sendSaslMessage(outStream, response.build());
       }
     } while (!done);
-    if (!inSasl && !fallbackAllowed) {
-      throw new IOException("Server asks us to fall back to SIMPLE " +
-          "auth, but this client is configured to only allow secure " +
-          "connections.");
-    }
-    return inSasl;
+    return authMethod;
   }
   
   private void sendSaslMessage(DataOutputStream out, RpcSaslProto message)
@@ -266,17 +454,37 @@ public class SaslRpcClient {
     out.flush();    
   }
   
+  /**
+   * Evaluate the server provided challenge.  The server must send a token
+   * if it's not done.  If the server is done, the challenge token is
+   * optional because not all mechanisms send a final token for the client to
+   * update its internal state.  The client must also be done after
+   * evaluating the optional token to ensure a malicious server doesn't
+   * prematurely end the negotiation with a phony success.
+   *  
+   * @param saslResponse - client response to challenge
+   * @param serverIsDone - server negotiation state
+   * @throws SaslException - any problems with negotiation
+   */
   private byte[] saslEvaluateToken(RpcSaslProto saslResponse,
-      boolean done) throws SaslException {
+      boolean serverIsDone) throws SaslException {
     byte[] saslToken = null;
     if (saslResponse.hasToken()) {
       saslToken = saslResponse.getToken().toByteArray();
       saslToken = saslClient.evaluateChallenge(saslToken);
-    } else if (!done) {
-      throw new SaslException("Challenge contains no token");
-    }
-    if (done && !saslClient.isComplete()) {
-      throw new SaslException("Client is out of sync with server");
+    } else if (!serverIsDone) {
+      // the server may only omit a token when it's done
+      throw new SaslException("Server challenge contains no token");
+    }
+    if (serverIsDone) {
+      // server tried to report success before our client completed
+      if (!saslClient.isComplete()) {
+        throw new SaslException("Client is out of sync with server");
+      }
+      // a client cannot generate a response to a success message
+      if (saslToken != null) {
+        throw new SaslException("Client generated spurious response");        
+      }
     }
     return saslToken;
   }
@@ -291,41 +499,147 @@ public class SaslRpcClient {
     return response;
   }
 
+  private boolean useWrap() {
+    // getNegotiatedProperty throws if client isn't complete
+    String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    // SASL wrapping is only used if the connection has a QOP, and
+    // the value is not auth.  ex. auth-int & auth-priv
+    return qop != null && !"auth".equalsIgnoreCase(qop);
+  }
+  
   /**
-   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
-   * been called.
+   * Get SASL wrapped InputStream if SASL QoP requires unwrapping,
+   * otherwise return original stream.  Can be called only after
+   * saslConnect() has been called.
    * 
-   * @param in
-   *          the InputStream to wrap
-   * @return a SASL wrapped InputStream
+   * @param in - InputStream used to make the connection
+   * @return InputStream that may be using SASL unwrap
    * @throws IOException
    */
   public InputStream getInputStream(InputStream in) throws IOException {
-    if (!saslClient.isComplete()) {
-      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    if (useWrap()) {
+      in = new WrappedInputStream(in);
     }
-    return new SaslInputStream(in, saslClient);
+    return in;
   }
 
   /**
-   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
-   * been called.
+   * Get SASL wrapped OutputStream if SASL QoP requires wrapping,
+   * otherwise return original stream.  Can be called only after
+   * saslConnect() has been called.
    * 
-   * @param out
-   *          the OutputStream to wrap
-   * @return a SASL wrapped OutputStream
+   * @param in - InputStream used to make the connection
+   * @return InputStream that may be using SASL unwrap
    * @throws IOException
    */
   public OutputStream getOutputStream(OutputStream out) throws IOException {
-    if (!saslClient.isComplete()) {
-      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    if (useWrap()) {
+      // the client and server negotiate a maximum buffer size that can be
+      // wrapped
+      String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
+      out = new BufferedOutputStream(new WrappedOutputStream(out),
+                                     Integer.parseInt(maxBuf));
+    }
+    return out;
+  }
+
+  // ideally this should be folded into the RPC decoding loop but it's
+  // currently split across Client and SaslRpcClient...
+  class WrappedInputStream extends FilterInputStream {
+    private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
+    public WrappedInputStream(InputStream in) throws IOException {
+      super(in);
+    }
+    
+    @Override
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      int n = read(b, 0, 1);
+      return (n != -1) ? b[0] : -1;
+    }
+    
+    @Override
+    public int read(byte b[]) throws IOException {
+      return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte[] buf, int off, int len) throws IOException {
+      synchronized(unwrappedRpcBuffer) {
+        // fill the buffer with the next RPC message
+        if (unwrappedRpcBuffer.remaining() == 0) {
+          readNextRpcPacket();
+        }
+        // satisfy as much of the request as possible
+        int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
+        unwrappedRpcBuffer.get(buf, off, readLen);
+        return readLen;
+      }
+    }
+    
+    // all messages must be RPC SASL wrapped, else an exception is thrown
+    private void readNextRpcPacket() throws IOException {
+      LOG.debug("reading next wrapped RPC packet");
+      DataInputStream dis = new DataInputStream(in);
+      int rpcLen = dis.readInt();
+      byte[] rpcBuf = new byte[rpcLen];
+      dis.readFully(rpcBuf);
+      
+      // decode the RPC header
+      ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
+      RpcResponseHeaderProto.Builder headerBuilder =
+          RpcResponseHeaderProto.newBuilder();
+      headerBuilder.mergeDelimitedFrom(bis);
+      
+      boolean isWrapped = false;
+      // Must be SASL wrapped, verify and decode.
+      if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
+        RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
+        saslMessage.mergeDelimitedFrom(bis);
+        if (saslMessage.getState() == SaslState.WRAP) {
+          isWrapped = true;
+          byte[] token = saslMessage.getToken().toByteArray();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("unwrapping token of length:" + token.length);
+          }
+          token = saslClient.unwrap(token, 0, token.length);
+          unwrappedRpcBuffer = ByteBuffer.wrap(token);
+        }
+      }
+      if (!isWrapped) {
+        throw new SaslException("Server sent non-wrapped response");
+      }
     }
-    return new SaslOutputStream(out, saslClient);
   }
 
+  class WrappedOutputStream extends FilterOutputStream {
+    public WrappedOutputStream(OutputStream out) throws IOException {
+      super(out);
+    }
+    @Override
+    public void write(byte[] buf, int off, int len) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("wrapping token of length:" + len);
+      }
+      buf = saslClient.wrap(buf, off, len);
+      RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+          .setState(SaslState.WRAP)
+          .setToken(ByteString.copyFrom(buf, 0, buf.length))
+          .build();
+      RpcRequestMessageWrapper request =
+          new RpcRequestMessageWrapper(saslHeader, saslMessage);
+      DataOutputStream dob = new DataOutputStream(out);
+      dob.writeInt(request.getLength());
+      request.write(dob);
+     }
+  }
+  
   /** Release resources used by wrapped saslClient */
   public void dispose() throws SaslException {
-    saslClient.dispose();
+    if (saslClient != null) {
+      saslClient.dispose();
+      saslClient = null;
+    }
   }
 
   private static class SaslClientCallbackHandler implements CallbackHandler {

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java Mon Aug 12 21:25:49 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -104,12 +103,12 @@ public class SaslRpcServer {
         String fullName = UserGroupInformation.getCurrentUser().getUserName();
         if (LOG.isDebugEnabled())
           LOG.debug("Kerberos principal name is " + fullName);
-        KerberosName krbName = new KerberosName(fullName);
-        serverId = krbName.getHostName();
-        if (serverId == null) {
-          serverId = "";
-        }
-        protocol = krbName.getServiceName();
+        // don't use KerberosName because we don't want auth_to_local
+        String[] parts = fullName.split("[/@]", 2);
+        protocol = parts[0];
+        // should verify service host is present here rather than in create()
+        // but lazy tests are using a UGI that isn't a SPN...
+        serverId = (parts.length < 2) ? "" : parts[1];
         break;
       }
       default:

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java Mon Aug 12 21:25:49 2013
@@ -217,7 +217,7 @@ public class SecurityUtil {
   private static String replacePattern(String[] components, String hostname)
       throws IOException {
     String fqdn = hostname;
-    if (fqdn == null || fqdn.equals("") || fqdn.equals("0.0.0.0")) {
+    if (fqdn == null || fqdn.isEmpty() || fqdn.equals("0.0.0.0")) {
       fqdn = getLocalHostName();
     }
     return components[0] + "/" + fqdn.toLowerCase(Locale.US) + "@" + components[2];
@@ -672,7 +672,7 @@ public class SecurityUtil {
   public static AuthenticationMethod getAuthenticationMethod(Configuration conf) {
     String value = conf.get(HADOOP_SECURITY_AUTHENTICATION, "simple");
     try {
-      return Enum.valueOf(AuthenticationMethod.class, value.toUpperCase());
+      return Enum.valueOf(AuthenticationMethod.class, value.toUpperCase(Locale.ENGLISH));
     } catch (IllegalArgumentException iae) {
       throw new IllegalArgumentException("Invalid attribute value for " +
           HADOOP_SECURITY_AUTHENTICATION + " of " + value);
@@ -685,6 +685,6 @@ public class SecurityUtil {
       authenticationMethod = AuthenticationMethod.SIMPLE;
     }
     conf.set(HADOOP_SECURITY_AUTHENTICATION,
-             authenticationMethod.toString().toLowerCase());
+             authenticationMethod.toString().toLowerCase(Locale.ENGLISH));
   }
 }