You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sr...@apache.org on 2013/12/12 19:56:46 UTC

svn commit: r1550486 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt src/main/java/org/apache/hadoop/ipc/RpcConstants.java src/main/java/org/apache/hadoop/ipc/Server.java

Author: sradia
Date: Thu Dec 12 18:56:45 2013
New Revision: 1550486

URL: http://svn.apache.org/r1550486
Log:
HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia)

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1550486&r1=1550485&r2=1550486&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Dec 12 18:56:45 2013
@@ -280,6 +280,8 @@ Trunk (Unreleased)
     HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view
     all pools (Andrew Wang via Colin Patrick McCabe)
 
+    HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1550486&r1=1550485&r2=1550486&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Thu Dec 12 18:56:45 2013
@@ -37,10 +37,24 @@ public class RpcConstants {
   
   public static final int INVALID_RETRY_COUNT = -1;
   
+ /**
+  * The Rpc-connection header is as follows 
+  * +----------------------------------+
+  * |  "hrpc" 4 bytes                  |      
+  * +----------------------------------+
+  * |  Version (1 byte)                |
+  * +----------------------------------+
+  * |  Service Class (1 byte)          |
+  * +----------------------------------+
+  * |  AuthProtocol (1 byte)           |      
+  * +----------------------------------+
+  */
+  
   /**
    * The first four bytes of Hadoop RPC connections
    */
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+  public static final int HEADER_LEN_AFTER_HRPC_PART = 3; // 3 bytes that follow
   
   // 1 : Introduce ping and server does not throw away RPCs
   // 3 : Introduce the protocol into the RPC connection header

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1550486&r1=1550485&r2=1550486&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Dec 12 18:56:45 2013
@@ -1105,6 +1105,9 @@ public abstract class Server {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
+      
+      // the buffer is initialized to read the "hrpc" and after that to read
+      // the length of the Rpc-packet (i.e 4 bytes)
       this.dataLengthBuffer = ByteBuffer.allocate(4);
       this.unwrappedData = null;
       this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
@@ -1200,7 +1203,16 @@ public abstract class Server {
       }
     }
 
-    private Throwable getCauseForInvalidToken(IOException e) {
+    /**
+     * Some exceptions ({@link RetriableException} and {@link StandbyException})
+     * that are wrapped as a cause of parameter e are unwrapped so that they can
+     * be sent as the true cause to the client side. In case of
+     * {@link InvalidToken} we go one level deeper to get the true cause.
+     * 
+     * @param e the exception that may have a cause we want to unwrap.
+     * @return the true cause for some exceptions.
+     */
+    private Throwable getTrueCause(IOException e) {
       Throwable cause = e;
       while (cause != null) {
         if (cause instanceof RetriableException) {
@@ -1223,6 +1235,18 @@ public abstract class Server {
       return e;
     }
     
+    /**
+     * Process saslMessage and send saslResponse back
+     * @param saslMessage received SASL message
+     * @throws WrappedRpcServerException setup failed due to SASL negotiation 
+     *         failure, premature or invalid connection context, or other state 
+     *         errors. This exception needs to be sent to the client. This 
+     *         exception will wrap {@link RetriableException}, 
+     *         {@link InvalidToken}, {@link StandbyException} or 
+     *         {@link SaslException}.
+     * @throws IOException if sending reply fails
+     * @throws InterruptedException
+     */
     private void saslProcess(RpcSaslProto saslMessage)
         throws WrappedRpcServerException, IOException, InterruptedException {
       if (saslContextEstablished) {
@@ -1239,7 +1263,7 @@ public abstract class Server {
           // attempting user could be null
           AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
               + attemptingUser + " (" + e.getLocalizedMessage() + ")");
-          throw (IOException) getCauseForInvalidToken(e);
+          throw (IOException) getTrueCause(e);
         }
         
         if (saslServer != null && saslServer.isComplete()) {
@@ -1274,13 +1298,26 @@ public abstract class Server {
       }
     }
     
+    /**
+     * Process a saslMessge.
+     * @param saslMessage received SASL message
+     * @return the sasl response to send back to client
+     * @throws SaslException if authentication or generating response fails, 
+     *                       or SASL protocol mixup
+     * @throws IOException if a SaslServer cannot be created
+     * @throws AccessControlException if the requested authentication type 
+     *         is not supported or trying to re-attempt negotiation.
+     * @throws InterruptedException
+     */
     private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
-        throws IOException, InterruptedException {
+        throws SaslException, IOException, AccessControlException,
+        InterruptedException {
       RpcSaslProto saslResponse = null;
       final SaslState state = saslMessage.getState(); // required      
       switch (state) {
         case NEGOTIATE: {
           if (sentNegotiate) {
+            // FIXME shouldn't this be SaslException?
             throw new AccessControlException(
                 "Client already attempted negotiation");
           }
@@ -1402,12 +1439,30 @@ public abstract class Server {
       }
     }
 
+    /**
+     * This method reads in a non-blocking fashion from the channel: 
+     * this method is called repeatedly when data is present in the channel; 
+     * when it has enough data to process one rpc it processes that rpc.
+     * 
+     * On the first pass, it processes the connectionHeader, 
+     * connectionContext (an outOfBand RPC) and at most one RPC request that 
+     * follows that. On future passes it will process at most one RPC request.
+     *  
+     * Quirky things: dataLengthBuffer (4 bytes) is used to read "hrpc" OR 
+     * rpc request length.
+     *    
+     * @return -1 in case of error, else num bytes read so far
+     * @throws WrappedRpcServerException - an exception that has already been 
+     *         sent back to the client that does not require verbose logging
+     *         by the Listener thread
+     * @throws IOException - internal error that should not be returned to
+     *         client, typically failure to respond to client
+     * @throws InterruptedException
+     */
     public int readAndProcess()
         throws WrappedRpcServerException, IOException, InterruptedException {
       while (true) {
-        /* Read at most one RPC. If the header is not read completely yet
-         * then iterate until we read first RPC or until there is no data left.
-         */    
+        // dataLengthBuffer is used to read "hrpc" or the rpc-packet length
         int count = -1;
         if (dataLengthBuffer.remaining() > 0) {
           count = channelRead(channel, dataLengthBuffer);       
@@ -1416,9 +1471,11 @@ public abstract class Server {
         }
         
         if (!connectionHeaderRead) {
-          //Every connection is expected to send the header.
+          // Every connection is expected to send the header;
+          // so far we read "hrpc" of the connection header.
           if (connectionHeaderBuf == null) {
-            connectionHeaderBuf = ByteBuffer.allocate(3);
+            // for the bytes that follow "hrpc", in the connection header
+            connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART);
           }
           count = channelRead(channel, connectionHeaderBuf);
           if (count < 0 || connectionHeaderBuf.remaining() > 0) {
@@ -1451,27 +1508,30 @@ public abstract class Server {
           // this may switch us into SIMPLE
           authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));          
           
-          dataLengthBuffer.clear();
+          dataLengthBuffer.clear(); // clear to next read rpc packet len
           connectionHeaderBuf = null;
           connectionHeaderRead = true;
-          continue;
+          continue; // connection header read, now read  4 bytes rpc packet len
         }
         
-        if (data == null) {
+        if (data == null) { // just read 4 bytes -  length of RPC packet
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
           checkDataLength(dataLength);
+          // Set buffer for reading EXACTLY the RPC-packet length and no more.
           data = ByteBuffer.allocate(dataLength);
         }
-        
+        // Now read the RPC packet
         count = channelRead(channel, data);
         
         if (data.remaining() == 0) {
-          dataLengthBuffer.clear();
+          dataLengthBuffer.clear(); // to read length of future rpc packets
           data.flip();
           boolean isHeaderRead = connectionContextRead;
           processOneRpc(data.array());
           data = null;
+          // the last rpc-request we processed could have simply been the
+          // connectionContext; if so continue to read the first RPC.
           if (!isHeaderRead) {
             continue;
           }
@@ -1508,8 +1568,16 @@ public abstract class Server {
       return authProtocol;
     }
 
+    /**
+     * Process the Sasl's Negotiate request, including the optimization of 
+     * accelerating token negotiation.
+     * @return the response to Negotiate request - the list of enabled 
+     *         authMethods and challenge if the TOKENS are supported. 
+     * @throws SaslException - if attempt to generate challenge fails.
+     * @throws IOException - if it fails to create the SASL server for Tokens
+     */
     private RpcSaslProto buildSaslNegotiateResponse()
-        throws IOException, InterruptedException {
+        throws InterruptedException, SaslException, IOException {
       RpcSaslProto negotiateMessage = negotiateResponse;
       // accelerate token negotiation by sending initial challenge
       // in the negotiation response
@@ -1635,8 +1703,11 @@ public abstract class Server {
     /**
      * Process a wrapped RPC Request - unwrap the SASL packet and process
      * each embedded RPC request 
-     * @param buf - SASL wrapped request of one or more RPCs
+     * @param inBuf - SASL wrapped request of one or more RPCs
      * @throws IOException - SASL packet cannot be unwrapped
+     * @throws WrappedRpcServerException - an exception that has already been 
+     *         sent back to the client that does not require verbose logging
+     *         by the Listener thread
      * @throws InterruptedException
      */    
     private void unwrapPacketAndProcessRpcs(byte[] inBuf)
@@ -1677,13 +1748,21 @@ public abstract class Server {
     }
     
     /**
-     * Process an RPC Request - handle connection setup and decoding of
-     * request into a Call
+     * Process one RPC Request from buffer read from socket stream 
+     *  - decode rpc in a rpc-Call
+     *  - handle out-of-band RPC requests such as the initial connectionContext
+     *  - A successfully decoded RpcCall will be deposited in RPC-Q and
+     *    its response will be sent later when the request is processed.
+     * 
+     * Prior to this call the connectionHeader ("hrpc...") has been handled and
+     * if SASL then SASL has been established and the buf we are passed
+     * has been unwrapped from SASL.
+     * 
      * @param buf - contains the RPC request header and the rpc request
      * @throws IOException - internal error that should not be returned to
      *         client, typically failure to respond to client
-     * @throws WrappedRpcServerException - an exception to be sent back to
-     *         the client that does not require verbose logging by the
+     * @throws WrappedRpcServerException - an exception that is sent back to the
+     *         client in this method and does not require verbose logging by the
      *         Listener thread
      * @throws InterruptedException
      */    
@@ -1753,8 +1832,11 @@ public abstract class Server {
     }
 
     /**
-     * Process an RPC Request - the connection headers and context must
-     * have been already read
+     * Process an RPC Request 
+     *   - the connection headers and context must have been already read.
+     *   - Based on the rpcKind, decode the rpcRequest.
+     *   - A successfully decoded RpcCall will be deposited in RPC-Q and
+     *     its response will be sent later when the request is processed.
      * @param header - RPC request header
      * @param dis - stream to request payload
      * @throws WrappedRpcServerException - due to fatal rpc layer issues such
@@ -1803,7 +1885,8 @@ public abstract class Server {
      * @param dis - stream to request payload
      * @throws WrappedRpcServerException - setup failed due to SASL
      *         negotiation failure, premature or invalid connection context,
-     *         or other state errors 
+     *         or other state errors. This exception needs to be sent to the 
+     *         client.
      * @throws IOException - failed to send a response back to the client
      * @throws InterruptedException
      */