You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2013/08/09 01:02:21 UTC
svn commit: r1512091 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ipc/
src/main/java/org/apache/hadoop/security/ src/main/proto/
Author: jitendra
Date: Thu Aug 8 23:02:20 2013
New Revision: 1512091
URL: http://svn.apache.org/r1512091
Log:
HADOOP-9820. RPCv9 wire protocol is insufficient to support multiplexing. Contributed by Daryn Sharp.
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1512091&r1=1512090&r2=1512091&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Aug 8 23:02:20 2013
@@ -383,6 +383,8 @@ Release 2.1.0-beta - 2013-08-06
HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn)
+ HADOOP-9820. [RPC v9] Wire protocol is insufficient to support multiplexing. (daryn via jitendra)
+
NEW FEATURES
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1512091&r1=1512090&r2=1512091&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Thu Aug 8 23:02:20 2013
@@ -737,12 +737,16 @@ public class Client {
}
if (doPing) {
- this.in = new DataInputStream(new BufferedInputStream(
- new PingInputStream(inStream)));
- } else {
- this.in = new DataInputStream(new BufferedInputStream(inStream));
+ inStream = new PingInputStream(inStream);
+ }
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
+
+ // SASL may have already buffered the stream
+ if (!(outStream instanceof BufferedOutputStream)) {
+ outStream = new BufferedOutputStream(outStream);
}
- this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ this.out = new DataOutputStream(outStream);
+
writeConnectionContext(remoteId, authMethod);
// update last activity time
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1512091&r1=1512090&r2=1512091&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Aug 8 23:02:20 2013
@@ -73,6 +73,8 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -1274,7 +1276,27 @@ public abstract class Server {
}
private void saslReadAndProcess(DataInputStream dis) throws
- WrappedRpcServerException, IOException, InterruptedException {
+ WrappedRpcServerException, IOException, InterruptedException {
+ final RpcSaslProto saslMessage =
+ decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
+ switch (saslMessage.getState()) {
+ case WRAP: {
+ if (!saslContextEstablished || !useWrap) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ new SaslException("Server is not wrapping data"));
+ }
+ // loops over decoded data and calls processOneRpc
+ unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
+ break;
+ }
+ default:
+ saslProcess(saslMessage);
+ }
+ }
+
+ private void saslProcess(RpcSaslProto saslMessage)
+ throws WrappedRpcServerException, IOException, InterruptedException {
if (saslContextEstablished) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1283,7 +1305,7 @@ public abstract class Server {
RpcSaslProto saslResponse = null;
try {
try {
- saslResponse = processSaslMessage(dis);
+ saslResponse = processSaslMessage(saslMessage);
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@@ -1328,14 +1350,14 @@ public abstract class Server {
// do NOT enable wrapping until the last auth response is sent
if (saslContextEstablished) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ // SASL wrapping is only used if the connection has a QOP, and
+ // the value is not auth. ex. auth-int & auth-priv
useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
}
}
- private RpcSaslProto processSaslMessage(DataInputStream dis)
+ private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
throws IOException, InterruptedException {
- final RpcSaslProto saslMessage =
- decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required
switch (state) {
@@ -1530,7 +1552,7 @@ public abstract class Server {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
- processRpcRequestPacket(data.array());
+ processOneRpc(data.array());
data = null;
if (!isHeaderRead) {
continue;
@@ -1693,29 +1715,19 @@ public abstract class Server {
}
/**
- * Process a RPC Request - if SASL wrapping is enabled, unwrap the
- * requests and process each one, else directly process the request
- * @param buf - single request or SASL wrapped requests
- * @throws IOException - connection failed to authenticate or authorize,
- * or the request could not be decoded into a Call
+ * Process a wrapped RPC Request - unwrap the SASL packet and process
+ * each embedded RPC request
+ * @param buf - SASL wrapped request of one or more RPCs
+ * @throws IOException - SASL packet cannot be unwrapped
* @throws InterruptedException
*/
- private void processRpcRequestPacket(byte[] buf)
- throws WrappedRpcServerException, IOException, InterruptedException {
- if (saslContextEstablished && useWrap) {
- if (LOG.isDebugEnabled())
- LOG.debug("Have read input token of size " + buf.length
- + " for processing by saslServer.unwrap()");
- final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
- // loops over decoded data and calls processOneRpc
- unwrapPacketAndProcessRpcs(plaintextData);
- } else {
- processOneRpc(buf);
- }
- }
-
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
throws WrappedRpcServerException, IOException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Have read input token of size " + inBuf.length
+ + " for processing by saslServer.unwrap()");
+ }
+ inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf));
// Read all RPCs contained in the inBuf, even partial ones
@@ -2378,9 +2390,21 @@ public abstract class Server {
LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
response.reset();
- DataOutputStream saslOut = new DataOutputStream(response);
- saslOut.writeInt(token.length);
- saslOut.write(token, 0, token.length);
+ // rebuild with sasl header and payload
+ RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
+ .setCallId(AuthProtocol.SASL.callId)
+ .setStatus(RpcStatusProto.SUCCESS)
+ .build();
+ RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+ .setState(SaslState.WRAP)
+ .setToken(ByteString.copyFrom(token, 0, token.length))
+ .build();
+ RpcResponseMessageWrapper saslResponse =
+ new RpcResponseMessageWrapper(saslHeader, saslMessage);
+
+ DataOutputStream out = new DataOutputStream(response);
+ out.writeInt(saslResponse.getLength());
+ saslResponse.write(out);
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1512091&r1=1512090&r2=1512091&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Thu Aug 8 23:02:20 2013
@@ -20,12 +20,16 @@ package org.apache.hadoop.security;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -485,38 +489,141 @@ public class SaslRpcClient {
return response;
}
+ private boolean useWrap() {
+ // getNegotiatedProperty throws if client isn't complete
+ String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ // SASL wrapping is only used if the connection has a QOP, and
+ // the value is not auth. ex. auth-int & auth-priv
+ return qop != null && !"auth".equalsIgnoreCase(qop);
+ }
+
/**
- * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
- * been called.
+ * Get SASL wrapped InputStream if SASL QoP requires unwrapping,
+ * otherwise return original stream. Can be called only after
+ * saslConnect() has been called.
*
- * @param in
- * the InputStream to wrap
- * @return a SASL wrapped InputStream
+ * @param in - InputStream used to make the connection
+ * @return InputStream that may be using SASL unwrap
* @throws IOException
*/
public InputStream getInputStream(InputStream in) throws IOException {
- if (!saslClient.isComplete()) {
- throw new IOException("Sasl authentication exchange hasn't completed yet");
+ if (useWrap()) {
+ in = new WrappedInputStream(in);
}
- return new SaslInputStream(in, saslClient);
+ return in;
}
/**
- * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
- * been called.
+ * Get SASL wrapped OutputStream if SASL QoP requires wrapping,
+ * otherwise return original stream. Can be called only after
+ * saslConnect() has been called.
*
- * @param out
- * the OutputStream to wrap
- * @return a SASL wrapped OutputStream
+ * @param in - InputStream used to make the connection
+ * @return InputStream that may be using SASL unwrap
* @throws IOException
*/
public OutputStream getOutputStream(OutputStream out) throws IOException {
- if (!saslClient.isComplete()) {
- throw new IOException("Sasl authentication exchange hasn't completed yet");
+ if (useWrap()) {
+ // the client and server negotiate a maximum buffer size that can be
+ // wrapped
+ String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
+ out = new BufferedOutputStream(new WrappedOutputStream(out),
+ Integer.parseInt(maxBuf));
+ }
+ return out;
+ }
+
+ // ideally this should be folded into the RPC decoding loop but it's
+ // currently split across Client and SaslRpcClient...
+ class WrappedInputStream extends FilterInputStream {
+ private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
+ public WrappedInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int n = read(b, 0, 1);
+ return (n != -1) ? b[0] : -1;
+ }
+
+ @Override
+ public int read(byte b[]) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ synchronized(unwrappedRpcBuffer) {
+ // fill the buffer with the next RPC message
+ if (unwrappedRpcBuffer.remaining() == 0) {
+ readNextRpcPacket();
+ }
+ // satisfy as much of the request as possible
+ int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
+ unwrappedRpcBuffer.get(buf, off, readLen);
+ return readLen;
+ }
+ }
+
+ // all messages must be RPC SASL wrapped, else an exception is thrown
+ private void readNextRpcPacket() throws IOException {
+ LOG.debug("reading next wrapped RPC packet");
+ DataInputStream dis = new DataInputStream(in);
+ int rpcLen = dis.readInt();
+ byte[] rpcBuf = new byte[rpcLen];
+ dis.readFully(rpcBuf);
+
+ // decode the RPC header
+ ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
+ RpcResponseHeaderProto.Builder headerBuilder =
+ RpcResponseHeaderProto.newBuilder();
+ headerBuilder.mergeDelimitedFrom(bis);
+
+ boolean isWrapped = false;
+ // Must be SASL wrapped, verify and decode.
+ if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
+ RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
+ saslMessage.mergeDelimitedFrom(bis);
+ if (saslMessage.getState() == SaslState.WRAP) {
+ isWrapped = true;
+ byte[] token = saslMessage.getToken().toByteArray();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("unwrapping token of length:" + token.length);
+ }
+ token = saslClient.unwrap(token, 0, token.length);
+ unwrappedRpcBuffer = ByteBuffer.wrap(token);
+ }
+ }
+ if (!isWrapped) {
+ throw new SaslException("Server sent non-wrapped response");
+ }
}
- return new SaslOutputStream(out, saslClient);
}
+ class WrappedOutputStream extends FilterOutputStream {
+ public WrappedOutputStream(OutputStream out) throws IOException {
+ super(out);
+ }
+ @Override
+ public void write(byte[] buf, int off, int len) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("wrapping token of length:" + len);
+ }
+ buf = saslClient.wrap(buf, off, len);
+ RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+ .setState(SaslState.WRAP)
+ .setToken(ByteString.copyFrom(buf, 0, buf.length))
+ .build();
+ RpcRequestMessageWrapper request =
+ new RpcRequestMessageWrapper(saslHeader, saslMessage);
+ DataOutputStream dob = new DataOutputStream(out);
+ dob.writeInt(request.getLength());
+ request.write(dob);
+ }
+ }
+
/** Release resources used by wrapped saslClient */
public void dispose() throws SaslException {
if (saslClient != null) {
@@ -572,4 +679,4 @@ public class SaslRpcClient {
}
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto?rev=1512091&r1=1512090&r2=1512091&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto Thu Aug 8 23:02:20 2013
@@ -141,6 +141,7 @@ message RpcSaslProto {
INITIATE = 2;
CHALLENGE = 3;
RESPONSE = 4;
+ WRAP = 5;
}
message SaslAuth {