You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2013/07/16 19:59:40 UTC
svn commit: r1503811 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Author: daryn
Date: Tue Jul 16 17:59:39 2013
New Revision: 1503811
URL: http://svn.apache.org/r1503811
Log:
HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1503811&r1=1503810&r2=1503811&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Tue Jul 16 17:59:39 2013
@@ -354,6 +354,8 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh)
+ HADOOP-9683. [RPC v9] Wrap IpcConnectionContext in RPC headers (daryn)
+
NEW FEATURES
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1503811&r1=1503810&r2=1503811&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Tue Jul 16 17:59:39 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.ipc;
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -63,7 +65,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -833,17 +838,20 @@ public class Client {
AuthMethod authMethod)
throws IOException {
// Write out the ConnectionHeader
- DataOutputBuffer buf = new DataOutputBuffer();
- ProtoUtil.makeIpcConnectionContext(
+ IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
RPC.getProtocolName(remoteId.getProtocol()),
remoteId.getTicket(),
- authMethod).writeTo(buf);
+ authMethod);
+ RpcRequestHeaderProto connectionContextHeader =
+ ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+ OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
+ clientId);
+ RpcRequestMessageWrapper request =
+ new RpcRequestMessageWrapper(connectionContextHeader, message);
// Write out the packet length
- int bufLen = buf.getLength();
-
- out.writeInt(bufLen);
- out.write(buf.getData(), 0, bufLen);
+ out.writeInt(request.getLength());
+ request.write(out);
}
/* wait till someone signals us to start reading RPC response or
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1503811&r1=1503810&r2=1503811&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Tue Jul 16 17:59:39 2013
@@ -32,6 +32,8 @@ public class RpcConstants {
public static final byte[] DUMMY_CLIENT_ID = new byte[0];
public static final int INVALID_CALL_ID = -2;
+
+ public static final int CONNECTION_CONTEXT_CALL_ID = -3;
/**
* The first four bytes of Hadoop RPC connections
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1503811&r1=1503810&r2=1503811&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Tue Jul 16 17:59:39 2013
@@ -73,7 +73,7 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -110,6 +110,7 @@ import com.google.common.annotations.Vis
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -776,9 +777,10 @@ public abstract class Server {
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
- LOG.info(getName() + ": readAndProcess threw exception " + e +
- " from client " + c.getHostAddress() +
- ". Count of bytes read: " + count, e);
+ // log stack trace for "interesting" exceptions not sent to client
+ LOG.info(getName() + ": readAndProcess from client " +
+ c.getHostAddress() + " threw exception [" + e + "]",
+ (e instanceof WrappedRpcServerException) ? null : e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
@@ -1098,6 +1100,32 @@ public abstract class Server {
}
};
+ /**
+ * Wrapper for RPC IOExceptions to be returned to the client. Used to
+ * let exceptions bubble up to top of processOneRpc where the correct
+ * callId can be associated with the response. Also used to prevent
+ * unnecessary stack trace logging if it's not an internal server error.
+ */
+ @SuppressWarnings("serial")
+ private static class WrappedRpcServerException extends RpcServerException {
+ private final RpcErrorCodeProto errCode;
+ public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
+ super(ioe.toString(), ioe);
+ this.errCode = errCode;
+ }
+ public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {
+ this(errCode, new RpcServerException(message));
+ }
+ @Override
+ public RpcErrorCodeProto getRpcErrorCodeProto() {
+ return errCode;
+ }
+ @Override
+ public String toString() {
+ return getCause().toString();
+ }
+ }
+
/** Reads calls from a connection and queues them for handling. */
public class Connection {
private boolean connectionHeaderRead = false; // connection header is read?
@@ -1135,6 +1163,7 @@ public abstract class Server {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
+
private final Call authFailedCall =
new Call(AUTHORIZATION_FAILED_CALLID, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
@@ -1215,7 +1244,7 @@ public abstract class Server {
}
private UserGroupInformation getAuthorizedUgi(String authorizedId)
- throws IOException {
+ throws InvalidToken, AccessControlException {
if (authMethod == AuthMethod.TOKEN) {
TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
secretManager);
@@ -1231,12 +1260,17 @@ public abstract class Server {
}
}
- private void saslReadAndProcess(byte[] saslToken) throws IOException,
- InterruptedException {
- if (!saslContextEstablished) {
- RpcSaslProto saslResponse;
+ private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws
+ WrappedRpcServerException, InterruptedException {
+ if (saslContextEstablished) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ new SaslException("Negotiation is already complete"));
+ }
+ RpcSaslProto saslResponse = null;
+ try {
try {
- saslResponse = processSaslMessage(saslToken);
+ saslResponse = processSaslMessage(dis);
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@@ -1252,9 +1286,7 @@ public abstract class Server {
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
" (" + e.getLocalizedMessage() + ")");
- // wait to send response until failure is logged
- doSaslReply(sendToClient);
- throw e;
+ throw sendToClient;
}
if (saslServer != null && saslServer.isComplete()) {
@@ -1272,37 +1304,19 @@ public abstract class Server {
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
}
- // send reply here to avoid a successful auth being logged as a
- // failure if response can't be sent
- doSaslReply(saslResponse);
- } else {
- if (LOG.isDebugEnabled())
- LOG.debug("Have read input token of size " + saslToken.length
- + " for processing by saslServer.unwrap()");
-
- if (!useWrap) {
- processOneRpc(saslToken);
- } else {
- byte[] plaintextData = saslServer.unwrap(saslToken, 0,
- saslToken.length);
- processUnwrappedData(plaintextData);
- }
+ } catch (WrappedRpcServerException wrse) { // don't re-wrap
+ throw wrse;
+ } catch (IOException ioe) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
}
+ return saslResponse;
}
- private RpcSaslProto processSaslMessage(byte[] buf)
+ private RpcSaslProto processSaslMessage(DataInputStream dis)
throws IOException, InterruptedException {
- final DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
- RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper();
- requestWrapper.readFields(dis);
-
- final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader;
- if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) {
- throw new SaslException("Client sent non-SASL request");
- }
final RpcSaslProto saslMessage =
- RpcSaslProto.parseFrom(requestWrapper.theRequestRead);
+ decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required
switch (state) {
@@ -1352,8 +1366,7 @@ public abstract class Server {
return saslResponse;
}
- private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken)
- throws IOException {
+ private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
if (LOG.isDebugEnabled()) {
LOG.debug("Will send " + state + " token of size "
+ ((replyToken != null) ? replyToken.length : null)
@@ -1367,8 +1380,7 @@ public abstract class Server {
return response.build();
}
- private void doSaslReply(Message message)
- throws IOException {
+ private void doSaslReply(Message message) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending sasl message "+message);
}
@@ -1481,16 +1493,7 @@ public abstract class Server {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
- if (authProtocol == AuthProtocol.SASL) {
- // switch to simple must ignore next negotiate or initiate
- if (skipInitialSaslHandshake) {
- authProtocol = AuthProtocol.NONE;
- } else {
- saslReadAndProcess(data.array());
- }
- } else {
- processOneRpc(data.array());
- }
+ processRpcRequestPacket(data.array());
data = null;
if (!isHeaderRead) {
continue;
@@ -1525,6 +1528,7 @@ public abstract class Server {
// switch to simple hack, but don't switch if other auths are
// supported, ex. tokens
if (isSimpleEnabled && enabledAuthMethods.size() == 1) {
+ authProtocol = AuthProtocol.NONE;
skipInitialSaslHandshake = true;
doSaslReply(buildSaslResponse(SaslState.SUCCESS, null));
}
@@ -1624,11 +1628,21 @@ public abstract class Server {
responder.doRespond(fakeCall);
}
- /** Reads the connection context following the connection header */
- private void processConnectionContext(byte[] buf) throws IOException {
- DataInputStream in =
- new DataInputStream(new ByteArrayInputStream(buf));
- connectionContext = IpcConnectionContextProto.parseFrom(in);
+ /** Reads the connection context following the connection header
+ * @param dis - DataInputStream from which to read the header
+ * @throws WrappedRpcServerException - if the header cannot be
+ * deserialized, or the user is not authorized
+ */
+ private void processConnectionContext(DataInputStream dis)
+ throws WrappedRpcServerException {
+ // allow only one connection context during a session
+ if (connectionContextRead) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection context already processed");
+ }
+ connectionContext = decodeProtobufFromStream(
+ IpcConnectionContextProto.newBuilder(), dis);
protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null;
@@ -1645,9 +1659,11 @@ public abstract class Server {
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
if (authMethod == AuthMethod.TOKEN) {
// Not allowed to doAs if token authentication is used
- throw new AccessControlException("Authenticated user (" + user
- + ") doesn't match what the client claims to be ("
- + protocolUser + ")");
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_UNAUTHORIZED,
+ new AccessControlException("Authenticated user (" + user
+ + ") doesn't match what the client claims to be ("
+ + protocolUser + ")"));
} else {
// Effective user can be different from authenticated user
// for simple auth or kerberos auth
@@ -1658,9 +1674,34 @@ public abstract class Server {
}
}
}
+ authorizeConnection();
+ // don't set until after authz because connection isn't established
+ connectionContextRead = true;
+ }
+
+ /**
+ * Process a RPC Request - if SASL wrapping is enabled, unwrap the
+ * requests and process each one, else directly process the request
+ * @param buf - single request or SASL wrapped requests
+ * @throws IOException - connection failed to authenticate or authorize,
+ * or the request could not be decoded into a Call
+ * @throws InterruptedException
+ */
+ private void processRpcRequestPacket(byte[] buf) throws IOException,
+ InterruptedException {
+ if (saslContextEstablished && useWrap) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have read input token of size " + buf.length
+ + " for processing by saslServer.unwrap()");
+ final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length);
+ // loops over decoded data and calls processOneRpc
+ unwrapPacketAndProcessRpcs(plaintextData);
+ } else {
+ processOneRpc(buf);
+ }
}
- private void processUnwrappedData(byte[] inBuf) throws IOException,
+ private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException,
InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf));
@@ -1699,61 +1740,93 @@ public abstract class Server {
}
}
- private void processOneRpc(byte[] buf) throws IOException,
- InterruptedException {
- if (connectionContextRead) {
- processRpcRequest(buf);
- } else {
- processConnectionContext(buf);
- connectionContextRead = true;
- if (!authorizeConnection()) {
- throw new AccessControlException("Connection from " + this
- + " for protocol " + connectionContext.getProtocol()
- + " is unauthorized for user " + user);
+ /**
+ * Process an RPC Request - handle connection setup and decoding of
+ * request into a Call
+ * @param buf - contains the RPC request header and the rpc request
+ * @throws IOException - internal error that should not be returned to
+ * client, typically failure to respond to client
+ * @throws WrappedRpcServerException - an exception to be sent back to
+ * the client that does not require verbose logging by the
+ * Listener thread
+ * @throws InterruptedException
+ */
+ private void processOneRpc(byte[] buf)
+ throws IOException, WrappedRpcServerException, InterruptedException {
+ int callId = -1;
+ try {
+ final DataInputStream dis =
+ new DataInputStream(new ByteArrayInputStream(buf));
+ final RpcRequestHeaderProto header =
+ decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
+ callId = header.getCallId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" got #" + callId);
}
+ checkRpcHeaders(header);
+
+ if (callId < 0) { // callIds typically used during connection setup
+ processRpcOutOfBandRequest(header, dis);
+ } else if (!connectionContextRead) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection context not established");
+ } else {
+ processRpcRequest(header, dis);
+ }
+ } catch (WrappedRpcServerException wrse) { // inform client of error
+ Throwable ioe = wrse.getCause();
+ final Call call = new Call(callId, null, this);
+ setupResponse(authFailedResponse, call,
+ RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
+ ioe.getClass().getName(), ioe.getMessage());
+ responder.doRespond(call);
+ throw wrse;
}
}
-
+
/**
- * Process an RPC Request - the connection headers and context have been
- * read
- * @param buf - contains the RPC request header and the rpc request
- * @throws RpcServerException due to fatal rpc layer issues such as
- * invalid header. In this case a RPC fatal status response is sent back
- * to client.
+ * Verify RPC header is valid
+ * @param header - RPC request header
+ * @throws WrappedRpcServerException - header contains invalid values
*/
-
- private void processRpcRequest(byte[] buf)
- throws RpcServerException, IOException, InterruptedException {
- DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
- RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
-
- if (LOG.isDebugEnabled())
- LOG.debug(" got #" + header.getCallId());
+ private void checkRpcHeaders(RpcRequestHeaderProto header)
+ throws WrappedRpcServerException {
if (!header.hasRpcOp()) {
String err = " IPC Server: No rpc op in rpcRequestHeader";
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
if (header.getRpcOp() !=
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
String err = "IPC Server does not implement rpc header operation" +
header.getRpcOp();
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
// If we know the rpc kind, get its class so that we can deserialize
// (Note it would make more sense to have the handler deserialize but
// we continue with this original design.
if (!header.hasRpcKind()) {
String err = " IPC Server: No rpc kind in rpcRequestHeader";
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
+ }
+
+ /**
+ * Process an RPC Request - the connection headers and context must
+ * have been already read
+ * @param header - RPC request header
+ * @param dis - stream to request payload
+ * @throws WrappedRpcServerException - due to fatal rpc layer issues such
+ * as invalid header or deserialization error. In this case a RPC fatal
+ * status response will later be sent back to client.
+ * @throws InterruptedException
+ */
+ private void processRpcRequest(RpcRequestHeaderProto header,
+ DataInputStream dis) throws WrappedRpcServerException,
+ InterruptedException {
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) {
@@ -1761,9 +1834,8 @@ public abstract class Server {
" from client " + getHostAddress());
final String err = "Unknown rpc kind in rpc header" +
header.getRpcKind();
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
Writable rpcRequest;
try { //Read the rpc request
@@ -1773,17 +1845,9 @@ public abstract class Server {
LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getRpcKind(), t);
- final Call readParamsFailedCall =
- new Call(header.getCallId(), null, this);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
String err = "IPC server unable to read call parameters: "+ t.getMessage();
-
- setupResponse(responseBuffer, readParamsFailedCall,
- RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
- null, t.getClass().getName(),
- err);
- responder.doRespond(readParamsFailedCall);
- throw new RpcServerException(err, t);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}
Call call = new Call(header.getCallId(), rpcRequest, this,
@@ -1793,7 +1857,59 @@ public abstract class Server {
incRpcCount(); // Increment the rpc count
}
- private boolean authorizeConnection() throws IOException {
+
+ /**
+ * Establish RPC connection setup by negotiating SASL if required, then
+ * reading and authorizing the connection header
+ * @param header - RPC header
+ * @param dis - stream to request payload
+ * @throws WrappedRpcServerException - setup failed due to SASL
+ * negotiation failure, premature or invalid connection context,
+ * or other state errors
+ * @throws IOException - failed to send a response back to the client
+ * @throws InterruptedException
+ */
+ private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
+ DataInputStream dis) throws WrappedRpcServerException, IOException,
+ InterruptedException {
+ final int callId = header.getCallId();
+ if (callId == CONNECTION_CONTEXT_CALL_ID) {
+ // SASL must be established prior to connection context
+ if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection header sent during SASL negotiation");
+ }
+ // read and authorize the user
+ processConnectionContext(dis);
+ } else if (callId == AuthProtocol.SASL.callId) {
+ // if client was switched to simple, ignore first SASL message
+ if (authProtocol != AuthProtocol.SASL) {
+ if (!skipInitialSaslHandshake) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "SASL protocol not requested by client");
+ }
+ skipInitialSaslHandshake = false;
+ return;
+ }
+ RpcSaslProto response = saslReadAndProcess(dis);
+ // send back response if any, may throw IOException
+ if (response != null) {
+ doSaslReply(response);
+ }
+ } else {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Unknown out of band call #" + callId);
+ }
+ }
+
+ /**
+ * Authorize proxy users to access this server
+ * @throws WrappedRpcServerException - user is not allowed to proxy
+ */
+ private void authorizeConnection() throws WrappedRpcServerException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
@@ -1809,17 +1925,37 @@ public abstract class Server {
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
+ LOG.info("Connection from " + this
+ + " for protocol " + connectionContext.getProtocol()
+ + " is unauthorized for user " + user);
rpcMetrics.incrAuthorizationFailures();
- setupResponse(authFailedResponse, authFailedCall,
- RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, null,
- ae.getClass().getName(), ae.getMessage());
- responder.doRespond(authFailedCall);
- return false;
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
}
- return true;
}
/**
+ * Decode the a protobuf from the given input stream
+ * @param builder - Builder of the protobuf to decode
+ * @param dis - DataInputStream to read the protobuf
+ * @return Message - decoded protobuf
+ * @throws WrappedRpcServerException - deserialization failed
+ */
+ @SuppressWarnings("unchecked")
+ private <T extends Message> T decodeProtobufFromStream(Builder builder,
+ DataInputStream dis) throws WrappedRpcServerException {
+ try {
+ builder.mergeDelimitedFrom(dis);
+ return (T)builder.build();
+ } catch (Exception ioe) {
+ Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+ "Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
+ }
+ }
+
+ /**
* Get service class for connection
* @return the serviceClass
*/
@@ -2223,18 +2359,6 @@ public abstract class Server {
}
- private void respondBadRpcHeader(Call call, String errorClass, String error)
- throws IOException
- {
- ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
- setupResponse(responseBuf, call,
- RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
- null, errorClass, error);
- responder.doRespond(call);
- return;
-
- }
-
private void wrapWithSasl(ByteArrayOutputStream response, Call call)
throws IOException {
if (call.connection.saslServer != null) {
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1503811&r1=1503810&r2=1503811&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java Tue Jul 16 17:59:39 2013
@@ -312,7 +312,7 @@ public class TestSaslRPC {
doDigestRpc(server, sm);
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
- assertTrue(ERROR_MESSAGE.equals(e.getLocalizedMessage()));
+ assertEquals(ERROR_MESSAGE, e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof InvalidToken);
succeeded = true;
}
@@ -818,6 +818,7 @@ public class TestSaslRPC {
}
try {
+ LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws IOException {