You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2013/08/09 01:31:17 UTC

svn commit: r1512097 - in /hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/security/ src/main/proto/

Author: jitendra
Date: Thu Aug  8 23:31:17 2013
New Revision: 1512097

URL: http://svn.apache.org/r1512097
Log:
svn merge -c 1512091 from trunk for HADOOP-9820. RPCv9 wire protocol is insufficient to support multiplexing. Contributed by Daryn Sharp.

Modified:
    hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
    hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1512097&r1=1512096&r2=1512097&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/CHANGES.txt Thu Aug  8 23:31:17 2013
@@ -43,6 +43,8 @@ Release 2.1.0-beta - 2013-08-06
 
     HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn)
 
+    HADOOP-9820. [RPC v9] Wire protocol is insufficient to support multiplexing. (daryn via jitendra)
+
   NEW FEATURES
 
     HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)

Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1512097&r1=1512096&r2=1512097&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Thu Aug  8 23:31:17 2013
@@ -684,12 +684,16 @@ public class Client {
           }
         
           if (doPing) {
-            this.in = new DataInputStream(new BufferedInputStream(
-                new PingInputStream(inStream)));
-          } else {
-            this.in = new DataInputStream(new BufferedInputStream(inStream));
+            inStream = new PingInputStream(inStream);
+          }
+          this.in = new DataInputStream(new BufferedInputStream(inStream));
+
+          // SASL may have already buffered the stream
+          if (!(outStream instanceof BufferedOutputStream)) {
+            outStream = new BufferedOutputStream(outStream);
           }
-          this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+          this.out = new DataOutputStream(outStream);
+          
           writeConnectionContext(remoteId, authMethod);
 
           // update last activity time

Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1512097&r1=1512096&r2=1512097&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Aug  8 23:31:17 2013
@@ -72,6 +72,8 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -1271,7 +1273,27 @@ public abstract class Server {
     }
 
     private void saslReadAndProcess(DataInputStream dis) throws
-        WrappedRpcServerException, IOException, InterruptedException {
+    WrappedRpcServerException, IOException, InterruptedException {
+      final RpcSaslProto saslMessage =
+          decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
+      switch (saslMessage.getState()) {
+        case WRAP: {
+          if (!saslContextEstablished || !useWrap) {
+            throw new WrappedRpcServerException(
+                RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+                new SaslException("Server is not wrapping data"));
+          }
+          // loops over decoded data and calls processOneRpc
+          unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
+          break;
+        }
+        default:
+          saslProcess(saslMessage);
+      }
+    }
+
+    private void saslProcess(RpcSaslProto saslMessage)
+        throws WrappedRpcServerException, IOException, InterruptedException {
       if (saslContextEstablished) {
         throw new WrappedRpcServerException(
             RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1280,7 +1302,7 @@ public abstract class Server {
       RpcSaslProto saslResponse = null;
       try {
         try {
-          saslResponse = processSaslMessage(dis);
+          saslResponse = processSaslMessage(saslMessage);
         } catch (IOException e) {
           IOException sendToClient = e;
           Throwable cause = e;
@@ -1325,14 +1347,14 @@ public abstract class Server {
       // do NOT enable wrapping until the last auth response is sent
       if (saslContextEstablished) {
         String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+        // SASL wrapping is only used if the connection has a QOP, and
+        // the value is not auth.  ex. auth-int & auth-priv
         useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));        
       }
     }
     
-    private RpcSaslProto processSaslMessage(DataInputStream dis)
+    private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
         throws IOException, InterruptedException {
-      final RpcSaslProto saslMessage =
-          decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
       RpcSaslProto saslResponse = null;
       final SaslState state = saslMessage.getState(); // required      
       switch (state) {
@@ -1527,7 +1549,7 @@ public abstract class Server {
           dataLengthBuffer.clear();
           data.flip();
           boolean isHeaderRead = connectionContextRead;
-          processRpcRequestPacket(data.array());
+          processOneRpc(data.array());
           data = null;
           if (!isHeaderRead) {
             continue;
@@ -1690,29 +1712,19 @@ public abstract class Server {
     }
     
     /**
-     * Process a RPC Request - if SASL wrapping is enabled, unwrap the
-     * requests and process each one, else directly process the request 
-     * @param buf - single request or SASL wrapped requests
-     * @throws IOException - connection failed to authenticate or authorize,
-     *   or the request could not be decoded into a Call
+     * Process a wrapped RPC Request - unwrap the SASL packet and process
+     * each embedded RPC request 
+     * @param buf - SASL wrapped request of one or more RPCs
+     * @throws IOException - SASL packet cannot be unwrapped
      * @throws InterruptedException
      */    
-    private void processRpcRequestPacket(byte[] buf)
-        throws WrappedRpcServerException, IOException, InterruptedException {
-      if (saslContextEstablished && useWrap) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("Have read input token of size " + buf.length
-              + " for processing by saslServer.unwrap()");        
-        final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
-        // loops over decoded data and calls processOneRpc
-        unwrapPacketAndProcessRpcs(plaintextData);
-      } else {
-        processOneRpc(buf);
-      }
-    }
-    
     private void unwrapPacketAndProcessRpcs(byte[] inBuf)
         throws WrappedRpcServerException, IOException, InterruptedException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Have read input token of size " + inBuf.length
+            + " for processing by saslServer.unwrap()");
+      }
+      inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
           inBuf));
       // Read all RPCs contained in the inBuf, even partial ones
@@ -2375,9 +2387,21 @@ public abstract class Server {
         LOG.debug("Adding saslServer wrapped token of size " + token.length
             + " as call response.");
       response.reset();
-      DataOutputStream saslOut = new DataOutputStream(response);
-      saslOut.writeInt(token.length);
-      saslOut.write(token, 0, token.length);
+      // rebuild with sasl header and payload
+      RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
+          .setCallId(AuthProtocol.SASL.callId)
+          .setStatus(RpcStatusProto.SUCCESS)
+          .build();
+      RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+          .setState(SaslState.WRAP)
+          .setToken(ByteString.copyFrom(token, 0, token.length))
+          .build();
+      RpcResponseMessageWrapper saslResponse =
+          new RpcResponseMessageWrapper(saslHeader, saslMessage);
+
+      DataOutputStream out = new DataOutputStream(response);
+      out.writeInt(saslResponse.getLength());
+      saslResponse.write(out);
     }
   }
   

Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1512097&r1=1512096&r2=1512097&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Thu Aug  8 23:31:17 2013
@@ -20,12 +20,16 @@ package org.apache.hadoop.security;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -485,38 +489,141 @@ public class SaslRpcClient {
     return response;
   }
 
+  private boolean useWrap() {
+    // getNegotiatedProperty throws if client isn't complete
+    String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    // SASL wrapping is only used if the connection has a QOP, and
+    // the value is not auth.  ex. auth-int & auth-priv
+    return qop != null && !"auth".equalsIgnoreCase(qop);
+  }
+  
   /**
-   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
-   * been called.
+   * Get SASL wrapped InputStream if SASL QoP requires unwrapping,
+   * otherwise return original stream.  Can be called only after
+   * saslConnect() has been called.
    * 
-   * @param in
-   *          the InputStream to wrap
-   * @return a SASL wrapped InputStream
+   * @param in - InputStream used to make the connection
+   * @return InputStream that may be using SASL unwrap
    * @throws IOException
    */
   public InputStream getInputStream(InputStream in) throws IOException {
-    if (!saslClient.isComplete()) {
-      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    if (useWrap()) {
+      in = new WrappedInputStream(in);
     }
-    return new SaslInputStream(in, saslClient);
+    return in;
   }
 
   /**
-   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
-   * been called.
+   * Get SASL wrapped OutputStream if SASL QoP requires wrapping,
+   * otherwise return original stream.  Can be called only after
+   * saslConnect() has been called.
    * 
-   * @param out
-   *          the OutputStream to wrap
-   * @return a SASL wrapped OutputStream
+   * @param in - InputStream used to make the connection
+   * @return InputStream that may be using SASL unwrap
    * @throws IOException
    */
   public OutputStream getOutputStream(OutputStream out) throws IOException {
-    if (!saslClient.isComplete()) {
-      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    if (useWrap()) {
+      // the client and server negotiate a maximum buffer size that can be
+      // wrapped
+      String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
+      out = new BufferedOutputStream(new WrappedOutputStream(out),
+                                     Integer.parseInt(maxBuf));
+    }
+    return out;
+  }
+
+  // ideally this should be folded into the RPC decoding loop but it's
+  // currently split across Client and SaslRpcClient...
+  class WrappedInputStream extends FilterInputStream {
+    private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
+    public WrappedInputStream(InputStream in) throws IOException {
+      super(in);
+    }
+    
+    @Override
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      int n = read(b, 0, 1);
+      return (n != -1) ? b[0] : -1;
+    }
+    
+    @Override
+    public int read(byte b[]) throws IOException {
+      return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte[] buf, int off, int len) throws IOException {
+      synchronized(unwrappedRpcBuffer) {
+        // fill the buffer with the next RPC message
+        if (unwrappedRpcBuffer.remaining() == 0) {
+          readNextRpcPacket();
+        }
+        // satisfy as much of the request as possible
+        int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
+        unwrappedRpcBuffer.get(buf, off, readLen);
+        return readLen;
+      }
+    }
+    
+    // all messages must be RPC SASL wrapped, else an exception is thrown
+    private void readNextRpcPacket() throws IOException {
+      LOG.debug("reading next wrapped RPC packet");
+      DataInputStream dis = new DataInputStream(in);
+      int rpcLen = dis.readInt();
+      byte[] rpcBuf = new byte[rpcLen];
+      dis.readFully(rpcBuf);
+      
+      // decode the RPC header
+      ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
+      RpcResponseHeaderProto.Builder headerBuilder =
+          RpcResponseHeaderProto.newBuilder();
+      headerBuilder.mergeDelimitedFrom(bis);
+      
+      boolean isWrapped = false;
+      // Must be SASL wrapped, verify and decode.
+      if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
+        RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
+        saslMessage.mergeDelimitedFrom(bis);
+        if (saslMessage.getState() == SaslState.WRAP) {
+          isWrapped = true;
+          byte[] token = saslMessage.getToken().toByteArray();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("unwrapping token of length:" + token.length);
+          }
+          token = saslClient.unwrap(token, 0, token.length);
+          unwrappedRpcBuffer = ByteBuffer.wrap(token);
+        }
+      }
+      if (!isWrapped) {
+        throw new SaslException("Server sent non-wrapped response");
+      }
     }
-    return new SaslOutputStream(out, saslClient);
   }
 
+  class WrappedOutputStream extends FilterOutputStream {
+    public WrappedOutputStream(OutputStream out) throws IOException {
+      super(out);
+    }
+    @Override
+    public void write(byte[] buf, int off, int len) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("wrapping token of length:" + len);
+      }
+      buf = saslClient.wrap(buf, off, len);
+      RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+          .setState(SaslState.WRAP)
+          .setToken(ByteString.copyFrom(buf, 0, buf.length))
+          .build();
+      RpcRequestMessageWrapper request =
+          new RpcRequestMessageWrapper(saslHeader, saslMessage);
+      DataOutputStream dob = new DataOutputStream(out);
+      dob.writeInt(request.getLength());
+      request.write(dob);
+     }
+  }
+  
   /** Release resources used by wrapped saslClient */
   public void dispose() throws SaslException {
     if (saslClient != null) {
@@ -572,4 +679,4 @@ public class SaslRpcClient {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto?rev=1512097&r1=1512096&r2=1512097&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto (original)
+++ hadoop/common/branches/branch-2.1.0-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto Thu Aug  8 23:31:17 2013
@@ -141,6 +141,7 @@ message RpcSaslProto {
     INITIATE  = 2;
     CHALLENGE = 3;
     RESPONSE  = 4;
+    WRAP = 5;
   }
   
   message SaslAuth {