You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:02 UTC
svn commit: r1513258 [5/9] - in
/hadoop/common/branches/YARN-321/hadoop-common-project: ./
hadoop-annotations/ hadoop-auth-examples/
hadoop-auth-examples/src/main/webapp/
hadoop-auth-examples/src/main/webapp/annonymous/
hadoop-auth-examples/src/main/we...
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Aug 12 21:25:49 2013
@@ -71,7 +71,9 @@ import org.apache.hadoop.fs.CommonConfig
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -84,6 +86,7 @@ import org.apache.hadoop.ipc.protobuf.Rp
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslAuth;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslState;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
@@ -108,6 +111,7 @@ import com.google.common.annotations.Vis
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -155,11 +159,7 @@ public abstract class Server {
return terseExceptions.contains(t.toString());
}
}
-
- /**
- * The first four bytes of Hadoop RPC connections
- */
- public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+
/**
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
@@ -177,17 +177,6 @@ public abstract class Server {
"Content-type: text/plain\r\n\r\n" +
"It looks like you are making an HTTP request to a Hadoop IPC port. " +
"This is not the correct port for the web interface on this daemon.\r\n";
-
- // 1 : Introduce ping and server does not throw away RPCs
- // 3 : Introduce the protocol into the RPC connection header
- // 4 : Introduced SASL security layer
- // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
- // in ObjectWritable to efficiently transmit arrays of primitives
- // 6 : Made RPC Request header explicit
- // 7 : Changed Ipc Connection Header to use Protocol buffers
- // 8 : SASL server always sends a final response
- // 9 : Changes to protocol for HADOOP-8990
- public static final byte CURRENT_VERSION = 9;
/**
* Initial and max size of response buffer
@@ -281,16 +270,50 @@ public abstract class Server {
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+ /** Get the current call */
+ @VisibleForTesting
+ public static ThreadLocal<Call> getCurCall() {
+ return CurCall;
+ }
+
+ /**
+ * Returns the currently active RPC call's sequential ID number. A negative
+ * call ID indicates an invalid value, such as if there is no currently active
+ * RPC call.
+ *
+ * @return int sequential ID number of currently active RPC call
+ */
+ public static int getCallId() {
+ Call call = CurCall.get();
+ return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
+ }
+
+ /**
+ * @return The current active RPC call's retry count. -1 indicates the retry
+ * cache is not supported in the client side.
+ */
+ public static int getCallRetryCount() {
+ Call call = CurCall.get();
+ return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;
+ }
+
/** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
*/
public static InetAddress getRemoteIp() {
Call call = CurCall.get();
- if (call != null) {
- return call.connection.getHostInetAddress();
- }
- return null;
+ return (call != null && call.connection != null) ? call.connection
+ .getHostInetAddress() : null;
}
+
+ /**
+ * Returns the clientId from the current RPC request
+ */
+ public static byte[] getClientId() {
+ Call call = CurCall.get();
+ return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
+ }
+
/** Returns remote address as a string when invoked inside an RPC.
* Returns null in case of an error.
*/
@@ -305,7 +328,8 @@ public abstract class Server {
*/
public static UserGroupInformation getRemoteUser() {
Call call = CurCall.get();
- return (call != null) ? call.connection.user : null;
+ return (call != null && call.connection != null) ? call.connection.user
+ : null;
}
/** Return true if the invocation was through an RPC.
@@ -443,30 +467,39 @@ public abstract class Server {
}
/** A call queued for handling. */
- private static class Call {
+ public static class Call {
private final int callId; // the client's call id
+ private final int retryCount; // the retry count of the call
private final Writable rpcRequest; // Serialized Rpc request from client
private final Connection connection; // connection to client
private long timestamp; // time received when response is null
// time served when response is not null
private ByteBuffer rpcResponse; // the response for this call
private final RPC.RpcKind rpcKind;
+ private final byte[] clientId;
- public Call(int id, Writable param, Connection connection) {
- this( id, param, connection, RPC.RpcKind.RPC_BUILTIN );
+ public Call(int id, int retryCount, Writable param,
+ Connection connection) {
+ this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
+ RpcConstants.DUMMY_CLIENT_ID);
}
- public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) {
+
+ public Call(int id, int retryCount, Writable param, Connection connection,
+ RPC.RpcKind kind, byte[] clientId) {
this.callId = id;
+ this.retryCount = retryCount;
this.rpcRequest = param;
this.connection = connection;
this.timestamp = Time.now();
this.rpcResponse = null;
this.rpcKind = kind;
+ this.clientId = clientId;
}
@Override
public String toString() {
- return rpcRequest.toString() + " from " + connection.toString();
+ return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+ + retryCount;
}
public void setResponse(ByteBuffer response) {
@@ -761,9 +794,13 @@ public abstract class Server {
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
- LOG.info(getName() + ": readAndProcess threw exception " + e +
- " from client " + c.getHostAddress() +
- ". Count of bytes read: " + count, e);
+ // a WrappedRpcServerException is an exception that has been sent
+ // to the client, so the stacktrace is unnecessary; any other
+ // exceptions are unexpected internal server errors and thus the
+ // stacktrace should be logged
+ LOG.info(getName() + ": readAndProcess from client " +
+ c.getHostAddress() + " threw exception [" + e + "]",
+ (e instanceof WrappedRpcServerException) ? null : e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
@@ -882,11 +919,7 @@ public abstract class Server {
}
for(Call call : calls) {
- try {
- doPurge(call, now);
- } catch (IOException e) {
- LOG.warn("Error in purging old calls " + e);
- }
+ doPurge(call, now);
}
} catch (OutOfMemoryError e) {
//
@@ -931,7 +964,7 @@ public abstract class Server {
// Remove calls that have been pending in the responseQueue
// for a long time.
//
- private void doPurge(Call call, long now) throws IOException {
+ private void doPurge(Call call, long now) {
LinkedList<Call> responseQueue = call.connection.responseQueue;
synchronized (responseQueue) {
Iterator<Call> iter = responseQueue.listIterator(0);
@@ -970,8 +1003,7 @@ public abstract class Server {
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
- call.connection);
+ LOG.debug(getName() + ": responding to " + call);
}
//
// Send as much data as we can in the non-blocking fashion
@@ -990,8 +1022,8 @@ public abstract class Server {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
- call.connection + " Wrote " + numBytes + " bytes.");
+ LOG.debug(getName() + ": responding to " + call
+ + " Wrote " + numBytes + " bytes.");
}
} else {
//
@@ -1018,9 +1050,8 @@ public abstract class Server {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
- call.connection + " Wrote partial " + numBytes +
- " bytes.");
+ LOG.debug(getName() + ": responding to " + call
+ + " Wrote partial " + numBytes + " bytes.");
}
}
error = false; // everything went off well
@@ -1083,6 +1114,32 @@ public abstract class Server {
}
};
+ /**
+ * Wrapper for RPC IOExceptions to be returned to the client. Used to
+ * let exceptions bubble up to top of processOneRpc where the correct
+ * callId can be associated with the response. Also used to prevent
+ * unnecessary stack trace logging if it's not an internal server error.
+ */
+ @SuppressWarnings("serial")
+ private static class WrappedRpcServerException extends RpcServerException {
+ private final RpcErrorCodeProto errCode;
+ public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
+ super(ioe.toString(), ioe);
+ this.errCode = errCode;
+ }
+ public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {
+ this(errCode, new RpcServerException(message));
+ }
+ @Override
+ public RpcErrorCodeProto getRpcErrorCodeProto() {
+ return errCode;
+ }
+ @Override
+ public String toString() {
+ return getCause().toString();
+ }
+ }
+
/** Reads calls from a connection and queues them for handling. */
public class Connection {
private boolean connectionHeaderRead = false; // connection header is read?
@@ -1109,7 +1166,6 @@ public abstract class Server {
private AuthMethod authMethod;
private AuthProtocol authProtocol;
private boolean saslContextEstablished;
- private boolean skipInitialSaslHandshake;
private ByteBuffer connectionHeaderBuf = null;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
@@ -1119,12 +1175,12 @@ public abstract class Server {
public UserGroupInformation attemptingUser = null; // user name before auth
// Fake 'call' for failed authorization response
- private static final int AUTHORIZATION_FAILED_CALLID = -1;
- private final Call authFailedCall =
- new Call(AUTHORIZATION_FAILED_CALLID, null, this);
+ private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
+ RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
- private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this);
+ private final Call saslCall = new Call(AuthProtocol.SASL.callId,
+ RpcConstants.INVALID_RETRY_COUNT, null, this);
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
private boolean sentNegotiate = false;
@@ -1200,7 +1256,7 @@ public abstract class Server {
}
private UserGroupInformation getAuthorizedUgi(String authorizedId)
- throws IOException {
+ throws InvalidToken, AccessControlException {
if (authMethod == AuthMethod.TOKEN) {
TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
secretManager);
@@ -1216,12 +1272,37 @@ public abstract class Server {
}
}
- private void saslReadAndProcess(byte[] saslToken) throws IOException,
- InterruptedException {
- if (!saslContextEstablished) {
- RpcSaslProto saslResponse;
+ private void saslReadAndProcess(DataInputStream dis) throws
+ WrappedRpcServerException, IOException, InterruptedException {
+ final RpcSaslProto saslMessage =
+ decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
+ switch (saslMessage.getState()) {
+ case WRAP: {
+ if (!saslContextEstablished || !useWrap) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ new SaslException("Server is not wrapping data"));
+ }
+ // loops over decoded data and calls processOneRpc
+ unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
+ break;
+ }
+ default:
+ saslProcess(saslMessage);
+ }
+ }
+
+ private void saslProcess(RpcSaslProto saslMessage)
+ throws WrappedRpcServerException, IOException, InterruptedException {
+ if (saslContextEstablished) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ new SaslException("Negotiation is already complete"));
+ }
+ RpcSaslProto saslResponse = null;
+ try {
try {
- saslResponse = processSaslMessage(saslToken);
+ saslResponse = processSaslMessage(saslMessage);
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@@ -1237,9 +1318,7 @@ public abstract class Server {
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
" (" + e.getLocalizedMessage() + ")");
- // wait to send response until failure is logged
- doSaslReply(sendToClient);
- throw e;
+ throw sendToClient;
}
if (saslServer != null && saslServer.isComplete()) {
@@ -1247,8 +1326,6 @@ public abstract class Server {
LOG.debug("SASL server context established. Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
}
- String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
user = getAuthorizedUgi(saslServer.getAuthorizationID());
if (LOG.isDebugEnabled()) {
LOG.debug("SASL server successfully authenticated client: " + user);
@@ -1257,37 +1334,27 @@ public abstract class Server {
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
}
- // send reply here to avoid a successful auth being logged as a
- // failure if response can't be sent
+ } catch (WrappedRpcServerException wrse) { // don't re-wrap
+ throw wrse;
+ } catch (IOException ioe) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
+ }
+ // send back response if any, may throw IOException
+ if (saslResponse != null) {
doSaslReply(saslResponse);
- } else {
- if (LOG.isDebugEnabled())
- LOG.debug("Have read input token of size " + saslToken.length
- + " for processing by saslServer.unwrap()");
-
- if (!useWrap) {
- processOneRpc(saslToken);
- } else {
- byte[] plaintextData = saslServer.unwrap(saslToken, 0,
- saslToken.length);
- processUnwrappedData(plaintextData);
- }
+ }
+ // do NOT enable wrapping until the last auth response is sent
+ if (saslContextEstablished) {
+ String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ // SASL wrapping is only used if the connection has a QOP, and
+ // the value is not auth. ex. auth-int & auth-priv
+ useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
}
}
- private RpcSaslProto processSaslMessage(byte[] buf)
+ private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
throws IOException, InterruptedException {
- final DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
- RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper();
- requestWrapper.readFields(dis);
-
- final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader;
- if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) {
- throw new SaslException("Client sent non-SASL request");
- }
- final RpcSaslProto saslMessage =
- RpcSaslProto.parseFrom(requestWrapper.theRequestRead);
RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required
switch (state) {
@@ -1297,23 +1364,39 @@ public abstract class Server {
"Client already attempted negotiation");
}
saslResponse = buildSaslNegotiateResponse();
+ // simple-only server negotiate response is success which client
+ // interprets as switch to simple
+ if (saslResponse.getState() == SaslState.SUCCESS) {
+ switchToSimple();
+ }
break;
}
case INITIATE: {
if (saslMessage.getAuthsCount() != 1) {
throw new SaslException("Client mechanism is malformed");
}
- String authMethodName = saslMessage.getAuths(0).getMethod();
- authMethod = createSaslServer(authMethodName);
- if (authMethod == null) { // the auth method is not supported
+ // verify the client requested an advertised authType
+ SaslAuth clientSaslAuth = saslMessage.getAuths(0);
+ if (!negotiateResponse.getAuthsList().contains(clientSaslAuth)) {
if (sentNegotiate) {
throw new AccessControlException(
- authMethodName + " authentication is not enabled."
+ clientSaslAuth.getMethod() + " authentication is not enabled."
+ " Available:" + enabledAuthMethods);
}
saslResponse = buildSaslNegotiateResponse();
break;
}
+ authMethod = AuthMethod.valueOf(clientSaslAuth.getMethod());
+ // abort SASL for SIMPLE auth, server has already ensured that
+ // SIMPLE is a legit option above. we will send no response
+ if (authMethod == AuthMethod.SIMPLE) {
+ switchToSimple();
+ break;
+ }
+ // sasl server for tokens may already be instantiated
+ if (saslServer == null || authMethod != AuthMethod.TOKEN) {
+ saslServer = createSaslServer(authMethod);
+ }
// fallthru to process sasl token
}
case RESPONSE: {
@@ -1336,9 +1419,14 @@ public abstract class Server {
}
return saslResponse;
}
+
+ private void switchToSimple() {
+ // disable SASL and blank out any SASL server
+ authProtocol = AuthProtocol.NONE;
+ saslServer = null;
+ }
- private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken)
- throws IOException {
+ private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
if (LOG.isDebugEnabled()) {
LOG.debug("Will send " + state + " token of size "
+ ((replyToken != null) ? replyToken.length : null)
@@ -1352,8 +1440,7 @@ public abstract class Server {
return response.build();
}
- private void doSaslReply(Message message)
- throws IOException {
+ private void doSaslReply(Message message) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending sasl message "+message);
}
@@ -1394,7 +1481,8 @@ public abstract class Server {
}
}
- public int readAndProcess() throws IOException, InterruptedException {
+ public int readAndProcess()
+ throws WrappedRpcServerException, IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
@@ -1427,8 +1515,9 @@ public abstract class Server {
setupHttpRequestOnIpcPortResponse();
return -1;
}
-
- if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
+
+ if (!RpcConstants.HEADER.equals(dataLengthBuffer)
+ || version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
@@ -1450,11 +1539,6 @@ public abstract class Server {
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
- if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) {
- // covers the !useSasl too
- dataLengthBuffer.clear();
- return 0; // ping message
- }
checkDataLength(dataLength);
data = ByteBuffer.allocate(dataLength);
}
@@ -1465,16 +1549,7 @@ public abstract class Server {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
- if (authProtocol == AuthProtocol.SASL) {
- // switch to simple must ignore next negotiate or initiate
- if (skipInitialSaslHandshake) {
- authProtocol = AuthProtocol.NONE;
- } else {
- saslReadAndProcess(data.array());
- }
- } else {
- processOneRpc(data.array());
- }
+ processOneRpc(data.array());
data = null;
if (!isHeaderRead) {
continue;
@@ -1485,7 +1560,7 @@ public abstract class Server {
}
private AuthProtocol initializeAuthContext(int authType)
- throws IOException, InterruptedException {
+ throws IOException {
AuthProtocol authProtocol = AuthProtocol.valueOf(authType);
if (authProtocol == null) {
IOException ioe = new IpcException("Unknown auth protocol:" + authType);
@@ -1505,14 +1580,7 @@ public abstract class Server {
}
break;
}
- case SASL: {
- // switch to simple hack, but don't switch if other auths are
- // supported, ex. tokens
- if (isSimpleEnabled && enabledAuthMethods.size() == 1) {
- skipInitialSaslHandshake = true;
- doSaslReply(buildSaslResponse(SaslState.SUCCESS, null));
- }
- // else wait for a negotiate or initiate
+ default: {
break;
}
}
@@ -1537,25 +1605,6 @@ public abstract class Server {
return negotiateMessage;
}
- private AuthMethod createSaslServer(String authMethodName)
- throws IOException, InterruptedException {
- AuthMethod authMethod;
- try {
- authMethod = AuthMethod.valueOf(authMethodName);
- if (!enabledAuthMethods.contains(authMethod)) {
- authMethod = null;
- }
- } catch (IllegalArgumentException iae) {
- authMethod = null;
- }
- if (authMethod != null &&
- // sasl server for tokens may already be instantiated
- (saslServer == null || authMethod != AuthMethod.TOKEN)) {
- saslServer = createSaslServer(authMethod);
- }
- return authMethod;
- }
-
private SaslServer createSaslServer(AuthMethod authMethod)
throws IOException, InterruptedException {
return new SaslRpcServer(authMethod).create(this, secretManager);
@@ -1576,20 +1625,23 @@ public abstract class Server {
if (clientVersion >= 9) {
// Versions >>9 understand the normal response
- Call fakeCall = new Call(-1, null, this);
+ Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
setupResponse(buffer, fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
} else if (clientVersion >= 3) {
- Call fakeCall = new Call(-1, null, this);
+ Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
// Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
} else if (clientVersion == 2) { // Hadoop 0.18.3
- Call fakeCall = new Call(0, null, this);
+ Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
DataOutputStream out = new DataOutputStream(buffer);
out.writeInt(0); // call ID
out.writeBoolean(true); // error
@@ -1602,17 +1654,27 @@ public abstract class Server {
}
private void setupHttpRequestOnIpcPortResponse() throws IOException {
- Call fakeCall = new Call(0, null, this);
+ Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes()));
responder.doRespond(fakeCall);
}
- /** Reads the connection context following the connection header */
- private void processConnectionContext(byte[] buf) throws IOException {
- DataInputStream in =
- new DataInputStream(new ByteArrayInputStream(buf));
- connectionContext = IpcConnectionContextProto.parseFrom(in);
+ /** Reads the connection context following the connection header
+ * @param dis - DataInputStream from which to read the header
+ * @throws WrappedRpcServerException - if the header cannot be
+ * deserialized, or the user is not authorized
+ */
+ private void processConnectionContext(DataInputStream dis)
+ throws WrappedRpcServerException {
+ // allow only one connection context during a session
+ if (connectionContextRead) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection context already processed");
+ }
+ connectionContext = decodeProtobufFromStream(
+ IpcConnectionContextProto.newBuilder(), dis);
protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null;
@@ -1629,9 +1691,11 @@ public abstract class Server {
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
if (authMethod == AuthMethod.TOKEN) {
// Not allowed to doAs if token authentication is used
- throw new AccessControlException("Authenticated user (" + user
- + ") doesn't match what the client claims to be ("
- + protocolUser + ")");
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_UNAUTHORIZED,
+ new AccessControlException("Authenticated user (" + user
+ + ") doesn't match what the client claims to be ("
+ + protocolUser + ")"));
} else {
// Effective user can be different from authenticated user
// for simple auth or kerberos auth
@@ -1642,10 +1706,25 @@ public abstract class Server {
}
}
}
+ authorizeConnection();
+ // don't set until after authz because connection isn't established
+ connectionContextRead = true;
}
- private void processUnwrappedData(byte[] inBuf) throws IOException,
- InterruptedException {
+ /**
+ * Process a wrapped RPC Request - unwrap the SASL packet and process
+ * each embedded RPC request
+ * @param buf - SASL wrapped request of one or more RPCs
+ * @throws IOException - SASL packet cannot be unwrapped
+ * @throws InterruptedException
+ */
+ private void unwrapPacketAndProcessRpcs(byte[] inBuf)
+ throws WrappedRpcServerException, IOException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Have read input token of size " + inBuf.length
+ + " for processing by saslServer.unwrap()");
+ }
+ inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf));
// Read all RPCs contained in the inBuf, even partial ones
@@ -1660,13 +1739,6 @@ public abstract class Server {
if (unwrappedData == null) {
unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
- if (unwrappedDataLength == Client.PING_CALL_ID) {
- if (LOG.isDebugEnabled())
- LOG.debug("Received ping message");
- unwrappedDataLengthBuffer.clear();
- continue; // ping message
- }
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
}
@@ -1683,61 +1755,95 @@ public abstract class Server {
}
}
- private void processOneRpc(byte[] buf) throws IOException,
- InterruptedException {
- if (connectionContextRead) {
- processRpcRequest(buf);
- } else {
- processConnectionContext(buf);
- connectionContextRead = true;
- if (!authorizeConnection()) {
- throw new AccessControlException("Connection from " + this
- + " for protocol " + connectionContext.getProtocol()
- + " is unauthorized for user " + user);
+ /**
+ * Process an RPC Request - handle connection setup and decoding of
+ * request into a Call
+ * @param buf - contains the RPC request header and the rpc request
+ * @throws IOException - internal error that should not be returned to
+ * client, typically failure to respond to client
+ * @throws WrappedRpcServerException - an exception to be sent back to
+ * the client that does not require verbose logging by the
+ * Listener thread
+ * @throws InterruptedException
+ */
+ private void processOneRpc(byte[] buf)
+ throws IOException, WrappedRpcServerException, InterruptedException {
+ int callId = -1;
+ int retry = RpcConstants.INVALID_RETRY_COUNT;
+ try {
+ final DataInputStream dis =
+ new DataInputStream(new ByteArrayInputStream(buf));
+ final RpcRequestHeaderProto header =
+ decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
+ callId = header.getCallId();
+ retry = header.getRetryCount();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" got #" + callId);
}
+ checkRpcHeaders(header);
+
+ if (callId < 0) { // callIds typically used during connection setup
+ processRpcOutOfBandRequest(header, dis);
+ } else if (!connectionContextRead) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection context not established");
+ } else {
+ processRpcRequest(header, dis);
+ }
+ } catch (WrappedRpcServerException wrse) { // inform client of error
+ Throwable ioe = wrse.getCause();
+ final Call call = new Call(callId, retry, null, this);
+ setupResponse(authFailedResponse, call,
+ RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
+ ioe.getClass().getName(), ioe.getMessage());
+ responder.doRespond(call);
+ throw wrse;
}
}
-
+
/**
- * Process an RPC Request - the connection headers and context have been
- * read
- * @param buf - contains the RPC request header and the rpc request
- * @throws RpcServerException due to fatal rpc layer issues such as
- * invalid header. In this case a RPC fatal status response is sent back
- * to client.
+ * Verify RPC header is valid
+ * @param header - RPC request header
+ * @throws WrappedRpcServerException - header contains invalid values
*/
-
- private void processRpcRequest(byte[] buf)
- throws RpcServerException, IOException, InterruptedException {
- DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
- RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
-
- if (LOG.isDebugEnabled())
- LOG.debug(" got #" + header.getCallId());
+ private void checkRpcHeaders(RpcRequestHeaderProto header)
+ throws WrappedRpcServerException {
if (!header.hasRpcOp()) {
String err = " IPC Server: No rpc op in rpcRequestHeader";
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
if (header.getRpcOp() !=
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
String err = "IPC Server does not implement rpc header operation" +
header.getRpcOp();
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
// If we know the rpc kind, get its class so that we can deserialize
// (Note it would make more sense to have the handler deserialize but
// we continue with this original design.
if (!header.hasRpcKind()) {
String err = " IPC Server: No rpc kind in rpcRequestHeader";
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
+ }
+
+ /**
+ * Process an RPC Request - the connection headers and context must
+ * have been already read
+ * @param header - RPC request header
+ * @param dis - stream to request payload
+ * @throws WrappedRpcServerException - due to fatal rpc layer issues such
+ * as invalid header or deserialization error. In this case a RPC fatal
+ * status response will later be sent back to client.
+ * @throws InterruptedException
+ */
+ private void processRpcRequest(RpcRequestHeaderProto header,
+ DataInputStream dis) throws WrappedRpcServerException,
+ InterruptedException {
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) {
@@ -1745,9 +1851,8 @@ public abstract class Server {
" from client " + getHostAddress());
final String err = "Unknown rpc kind in rpc header" +
header.getRpcKind();
- respondBadRpcHeader(new Call(header.getCallId(), null, this),
- RpcServerException.class.getName(), err);
- throw new RpcServerException(err);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
Writable rpcRequest;
try { //Read the rpc request
@@ -1757,28 +1862,67 @@ public abstract class Server {
LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getRpcKind(), t);
- final Call readParamsFailedCall =
- new Call(header.getCallId(), null, this);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
String err = "IPC server unable to read call parameters: "+ t.getMessage();
-
- setupResponse(responseBuffer, readParamsFailedCall,
- RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
- null, t.getClass().getName(),
- err);
- responder.doRespond(readParamsFailedCall);
- throw new RpcServerException(err, t);
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}
- Call call = new Call(header.getCallId(), rpcRequest, this,
- ProtoUtil.convert(header.getRpcKind()));
+ Call call = new Call(header.getCallId(), header.getRetryCount(),
+ rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
+ .getClientId().toByteArray());
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
- private boolean authorizeConnection() throws IOException {
+
+ /**
+ * Establish RPC connection setup by negotiating SASL if required, then
+ * reading and authorizing the connection header
+ * @param header - RPC header
+ * @param dis - stream to request payload
+ * @throws WrappedRpcServerException - setup failed due to SASL
+ * negotiation failure, premature or invalid connection context,
+ * or other state errors
+ * @throws IOException - failed to send a response back to the client
+ * @throws InterruptedException
+ */
+ private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
+ DataInputStream dis) throws WrappedRpcServerException, IOException,
+ InterruptedException {
+ final int callId = header.getCallId();
+ if (callId == CONNECTION_CONTEXT_CALL_ID) {
+ // SASL must be established prior to connection context
+ if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection header sent during SASL negotiation");
+ }
+ // read and authorize the user
+ processConnectionContext(dis);
+ } else if (callId == AuthProtocol.SASL.callId) {
+ // if client was switched to simple, ignore first SASL message
+ if (authProtocol != AuthProtocol.SASL) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "SASL protocol not requested by client");
+ }
+ saslReadAndProcess(dis);
+ } else if (callId == PING_CALL_ID) {
+ LOG.debug("Received ping message");
+ } else {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Unknown out of band call #" + callId);
+ }
+ }
+
+ /**
+ * Authorize proxy users to access this server
+ * @throws WrappedRpcServerException - user is not allowed to proxy
+ */
+ private void authorizeConnection() throws WrappedRpcServerException {
try {
- // If auth method is DIGEST, the token was obtained by the
+ // If auth method is TOKEN, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
@@ -1792,17 +1936,37 @@ public abstract class Server {
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
+ LOG.info("Connection from " + this
+ + " for protocol " + connectionContext.getProtocol()
+ + " is unauthorized for user " + user);
rpcMetrics.incrAuthorizationFailures();
- setupResponse(authFailedResponse, authFailedCall,
- RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, null,
- ae.getClass().getName(), ae.getMessage());
- responder.doRespond(authFailedCall);
- return false;
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
}
- return true;
}
/**
+ * Decode the a protobuf from the given input stream
+ * @param builder - Builder of the protobuf to decode
+ * @param dis - DataInputStream to read the protobuf
+ * @return Message - decoded protobuf
+ * @throws WrappedRpcServerException - deserialization failed
+ */
+ @SuppressWarnings("unchecked")
+ private <T extends Message> T decodeProtobufFromStream(Builder builder,
+ DataInputStream dis) throws WrappedRpcServerException {
+ try {
+ builder.mergeDelimitedFrom(dis);
+ return (T)builder.build();
+ } catch (Exception ioe) {
+ Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+ "Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
+ }
+ }
+
+ /**
* Get service class for connection
* @return the serviceClass
*/
@@ -1818,7 +1982,7 @@ public abstract class Server {
this.serviceClass = serviceClass;
}
- private synchronized void close() throws IOException {
+ private synchronized void close() {
disposeSasl();
data = null;
dataLengthBuffer = null;
@@ -1851,8 +2015,7 @@ public abstract class Server {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": has Call#" + call.callId +
- "for RpcKind " + call.rpcKind + " from " + call.connection);
+ LOG.debug(getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
String errorClass = null;
String error = null;
@@ -2050,17 +2213,23 @@ public abstract class Server {
private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
throws IOException {
RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
- negotiateBuilder.setState(SaslState.NEGOTIATE);
- for (AuthMethod authMethod : authMethods) {
- if (authMethod == AuthMethod.SIMPLE) { // not a SASL method
- continue;
- }
- SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);
- negotiateBuilder.addAuthsBuilder()
- .setMethod(authMethod.toString())
- .setMechanism(saslRpcServer.mechanism)
- .setProtocol(saslRpcServer.protocol)
- .setServerId(saslRpcServer.serverId);
+ if (authMethods.contains(AuthMethod.SIMPLE) && authMethods.size() == 1) {
+ // SIMPLE-only servers return success in response to negotiate
+ negotiateBuilder.setState(SaslState.SUCCESS);
+ } else {
+ negotiateBuilder.setState(SaslState.NEGOTIATE);
+ for (AuthMethod authMethod : authMethods) {
+ SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);
+ SaslAuth.Builder builder = negotiateBuilder.addAuthsBuilder()
+ .setMethod(authMethod.toString())
+ .setMechanism(saslRpcServer.mechanism);
+ if (saslRpcServer.protocol != null) {
+ builder.setProtocol(saslRpcServer.protocol);
+ }
+ if (saslRpcServer.serverId != null) {
+ builder.setServerId(saslRpcServer.serverId);
+ }
+ }
}
return negotiateBuilder.build();
}
@@ -2095,10 +2264,7 @@ public abstract class Server {
if (connectionList.remove(connection))
numConnections--;
}
- try {
- connection.close();
- } catch (IOException e) {
- }
+ connection.close();
}
/**
@@ -2120,9 +2286,11 @@ public abstract class Server {
DataOutputStream out = new DataOutputStream(responseBuf);
RpcResponseHeaderProto.Builder headerBuilder =
RpcResponseHeaderProto.newBuilder();
+ headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
headerBuilder.setCallId(call.callId);
+ headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status);
- headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
+ headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build();
@@ -2206,18 +2374,6 @@ public abstract class Server {
}
- private void respondBadRpcHeader(Call call, String errorClass, String error)
- throws IOException
- {
- ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
- setupResponse(responseBuf, call,
- RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
- null, errorClass, error);
- responder.doRespond(call);
- return;
-
- }
-
private void wrapWithSasl(ByteArrayOutputStream response, Call call)
throws IOException {
if (call.connection.saslServer != null) {
@@ -2231,9 +2387,21 @@ public abstract class Server {
LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
response.reset();
- DataOutputStream saslOut = new DataOutputStream(response);
- saslOut.writeInt(token.length);
- saslOut.write(token, 0, token.length);
+ // rebuild with sasl header and payload
+ RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
+ .setCallId(AuthProtocol.SASL.callId)
+ .setStatus(RpcStatusProto.SUCCESS)
+ .build();
+ RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+ .setState(SaslState.WRAP)
+ .setToken(ByteString.copyFrom(token, 0, token.length))
+ .build();
+ RpcResponseMessageWrapper saslResponse =
+ new RpcResponseMessageWrapper(saslHeader, saslMessage);
+
+ DataOutputStream out = new DataOutputStream(response);
+ out.writeInt(saslResponse.getLength());
+ saslResponse.write(out);
}
}
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/file/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java Mon Aug 12 21:25:49 2013
@@ -129,7 +129,7 @@ public abstract class MetricsDynamicMBea
@Override
public Object getAttribute(String attributeName) throws AttributeNotFoundException,
MBeanException, ReflectionException {
- if (attributeName == null || attributeName.equals(""))
+ if (attributeName == null || attributeName.isEmpty())
throw new IllegalArgumentException();
updateMbeanInfoIfMetricsListChanged();
@@ -197,7 +197,7 @@ public abstract class MetricsDynamicMBea
public Object invoke(String actionName, Object[] parms, String[] signature)
throws MBeanException, ReflectionException {
- if (actionName == null || actionName.equals(""))
+ if (actionName == null || actionName.isEmpty())
throw new IllegalArgumentException();
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Mon Aug 12 21:25:49 2013
@@ -384,7 +384,7 @@ public class MetricsSystemImpl extends M
private void snapshotMetrics(MetricsSourceAdapter sa,
MetricsBufferBuilder bufferBuilder) {
long startTime = Time.now();
- bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
+ bufferBuilder.add(sa.name(), sa.getMetrics(collector, true));
collector.clear();
snapshotStat.add(Time.now() - startTime);
LOG.debug("Snapshotted source "+ sa.name());
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordInput.java Mon Aug 12 21:25:49 2013
@@ -144,7 +144,7 @@ public class CsvRecordInput implements R
@Override
public void startRecord(String tag) throws IOException {
- if (tag != null && !"".equals(tag)) {
+ if (tag != null && !tag.isEmpty()) {
char c1 = (char) stream.read();
char c2 = (char) stream.read();
if (c1 != 's' || c2 != '{') {
@@ -156,7 +156,7 @@ public class CsvRecordInput implements R
@Override
public void endRecord(String tag) throws IOException {
char c = (char) stream.read();
- if (tag == null || "".equals(tag)) {
+ if (tag == null || tag.isEmpty()) {
if (c != '\n' && c != '\r') {
throw new IOException("Error deserializing record.");
} else {
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/CsvRecordOutput.java Mon Aug 12 21:25:49 2013
@@ -115,7 +115,7 @@ public class CsvRecordOutput implements
@Override
public void startRecord(Record r, String tag) throws IOException {
- if (tag != null && !"".equals(tag)) {
+ if (tag != null && ! tag.isEmpty()) {
printCommaUnlessFirst();
stream.print("s{");
isFirst = true;
@@ -124,7 +124,7 @@ public class CsvRecordOutput implements
@Override
public void endRecord(Record r, String tag) throws IOException {
- if (tag == null || "".equals(tag)) {
+ if (tag == null || tag.isEmpty()) {
stream.print("\n");
isFirst = true;
} else {
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/generated/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/compiler/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java Mon Aug 12 21:25:49 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo;
/**
@@ -43,12 +44,13 @@ public interface RefreshUserMappingsProt
* Refresh user to group mappings.
* @throws IOException
*/
+ @Idempotent
public void refreshUserToGroupsMappings() throws IOException;
/**
* Refresh superuser proxy group list
* @throws IOException
*/
- public void refreshSuperUserGroupsConfiguration()
- throws IOException;
+ @Idempotent
+ public void refreshSuperUserGroupsConfiguration() throws IOException;
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Mon Aug 12 21:25:49 2013
@@ -20,18 +20,27 @@ package org.apache.hadoop.security;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
@@ -42,10 +51,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
@@ -57,8 +69,11 @@ import org.apache.hadoop.security.SaslRp
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ProtoUtil;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
/**
* A utility class that encapsulates SASL logic for RPC client
@@ -68,54 +83,155 @@ import com.google.protobuf.ByteString;
public class SaslRpcClient {
public static final Log LOG = LogFactory.getLog(SaslRpcClient.class);
- private final AuthMethod authMethod;
- private final SaslClient saslClient;
- private final boolean fallbackAllowed;
- private static final RpcRequestHeaderProto saslHeader =
- ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
- OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId);
+ private final UserGroupInformation ugi;
+ private final Class<?> protocol;
+ private final InetSocketAddress serverAddr;
+ private final Configuration conf;
+
+ private SaslClient saslClient;
+ private AuthMethod authMethod;
+
+ private static final RpcRequestHeaderProto saslHeader = ProtoUtil
+ .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+ OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
+ RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
private static final RpcSaslProto negotiateRequest =
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
/**
- * Create a SaslRpcClient for an authentication method
+ * Create a SaslRpcClient that can be used by a RPC client to negotiate
+ * SASL authentication with a RPC server
+ * @param ugi - connecting user
+ * @param protocol - RPC protocol
+ * @param serverAddr - InetSocketAddress of remote server
+ * @param conf - Configuration
+ */
+ public SaslRpcClient(UserGroupInformation ugi, Class<?> protocol,
+ InetSocketAddress serverAddr, Configuration conf) {
+ this.ugi = ugi;
+ this.protocol = protocol;
+ this.serverAddr = serverAddr;
+ this.conf = conf;
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public Object getNegotiatedProperty(String key) {
+ return (saslClient != null) ? saslClient.getNegotiatedProperty(key) : null;
+ }
+
+
+ // the RPC Client has an inelegant way of handling expiration of TGTs
+ // acquired via a keytab. any connection failure causes a relogin, so
+ // the Client needs to know what authMethod was being attempted if an
+ // exception occurs. the SASL prep for a kerberos connection should
+ // ideally relogin if necessary instead of exposing this detail to the
+ // Client
+ @InterfaceAudience.Private
+ public AuthMethod getAuthMethod() {
+ return authMethod;
+ }
+
+ /**
+ * Instantiate a sasl client for the first supported auth type in the
+ * given list. The auth type must be defined, enabled, and the user
+ * must possess the required credentials, else the next auth is tried.
*
- * @param method
- * the requested authentication method
- * @param token
- * token to use if needed by the authentication method
+ * @param authTypes to attempt in the given order
+ * @return SaslAuth of instantiated client
+ * @throws AccessControlException - client doesn't support any of the auths
+ * @throws IOException - misc errors
*/
- public SaslRpcClient(AuthMethod method,
- Token<? extends TokenIdentifier> token, String serverPrincipal,
- boolean fallbackAllowed)
- throws IOException {
- this.authMethod = method;
- this.fallbackAllowed = fallbackAllowed;
+ private SaslAuth selectSaslClient(List<SaslAuth> authTypes)
+ throws SaslException, AccessControlException, IOException {
+ SaslAuth selectedAuthType = null;
+ boolean switchToSimple = false;
+ for (SaslAuth authType : authTypes) {
+ if (!isValidAuthType(authType)) {
+ continue; // don't know what it is, try next
+ }
+ AuthMethod authMethod = AuthMethod.valueOf(authType.getMethod());
+ if (authMethod == AuthMethod.SIMPLE) {
+ switchToSimple = true;
+ } else {
+ saslClient = createSaslClient(authType);
+ if (saslClient == null) { // client lacks credentials, try next
+ continue;
+ }
+ }
+ selectedAuthType = authType;
+ break;
+ }
+ if (saslClient == null && !switchToSimple) {
+ List<String> serverAuthMethods = new ArrayList<String>();
+ for (SaslAuth authType : authTypes) {
+ serverAuthMethods.add(authType.getMethod());
+ }
+ throw new AccessControlException(
+ "Client cannot authenticate via:" + serverAuthMethods);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Use " + selectedAuthType.getMethod() +
+ " authentication for protocol " + protocol.getSimpleName());
+ }
+ return selectedAuthType;
+ }
+
+
+ private boolean isValidAuthType(SaslAuth authType) {
+ AuthMethod authMethod;
+ try {
+ authMethod = AuthMethod.valueOf(authType.getMethod());
+ } catch (IllegalArgumentException iae) { // unknown auth
+ authMethod = null;
+ }
+ // do we know what it is? is it using our mechanism?
+ return authMethod != null &&
+ authMethod.getMechanismName().equals(authType.getMechanism());
+ }
+
+ /**
+ * Try to create a SaslClient for an authentication type. May return
+ * null if the type isn't supported or the client lacks the required
+ * credentials.
+ *
+ * @param authType - the requested authentication method
+ * @return SaslClient for the authType or null
+ * @throws SaslException - error instantiating client
+ * @throws IOException - misc errors
+ */
+ private SaslClient createSaslClient(SaslAuth authType)
+ throws SaslException, IOException {
String saslUser = null;
- String saslProtocol = null;
- String saslServerName = null;
+ // SASL requires the client and server to use the same proto and serverId
+ // if necessary, auth types below will verify they are valid
+ final String saslProtocol = authType.getProtocol();
+ final String saslServerName = authType.getServerId();
Map<String, String> saslProperties = SaslRpcServer.SASL_PROPS;
CallbackHandler saslCallback = null;
+ final AuthMethod method = AuthMethod.valueOf(authType.getMethod());
switch (method) {
case TOKEN: {
- saslProtocol = "";
- saslServerName = SaslRpcServer.SASL_DEFAULT_REALM;
+ Token<?> token = getServerToken(authType);
+ if (token == null) {
+ return null; // tokens aren't supported or user doesn't have one
+ }
saslCallback = new SaslClientCallbackHandler(token);
break;
}
case KERBEROS: {
- if (serverPrincipal == null || serverPrincipal.isEmpty()) {
- throw new IOException(
- "Failed to specify server's Kerberos principal name");
- }
- KerberosName name = new KerberosName(serverPrincipal);
- saslProtocol = name.getServiceName();
- saslServerName = name.getHostName();
- if (saslServerName == null) {
- throw new IOException(
- "Kerberos principal name does NOT have the expected hostname part: "
- + serverPrincipal);
+ if (ugi.getRealAuthenticationMethod().getAuthMethod() !=
+ AuthMethod.KERBEROS) {
+ return null; // client isn't using kerberos
+ }
+ String serverPrincipal = getServerPrincipal(authType);
+ if (serverPrincipal == null) {
+ return null; // protocol doesn't use kerberos
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RPC Server's Kerberos principal name for protocol="
+ + protocol.getCanonicalName() + " is " + serverPrincipal);
}
break;
}
@@ -125,16 +241,93 @@ public class SaslRpcClient {
String mechanism = method.getMechanismName();
if (LOG.isDebugEnabled()) {
- LOG.debug("Creating SASL " + mechanism + "(" + authMethod + ") "
+ LOG.debug("Creating SASL " + mechanism + "(" + method + ") "
+ " client to authenticate to service at " + saslServerName);
}
- saslClient = Sasl.createSaslClient(
+ return Sasl.createSaslClient(
new String[] { mechanism }, saslUser, saslProtocol, saslServerName,
saslProperties, saslCallback);
- if (saslClient == null) {
- throw new IOException("Unable to find SASL client implementation");
+ }
+
+ /**
+ * Try to locate the required token for the server.
+ *
+ * @param authType of the SASL client
+ * @return Token<?> for server, or null if no token available
+ * @throws IOException - token selector cannot be instantiated
+ */
+ private Token<?> getServerToken(SaslAuth authType) throws IOException {
+ TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
+ LOG.debug("Get token info proto:"+protocol+" info:"+tokenInfo);
+ if (tokenInfo == null) { // protocol has no support for tokens
+ return null;
+ }
+ TokenSelector<?> tokenSelector = null;
+ try {
+ tokenSelector = tokenInfo.value().newInstance();
+ } catch (InstantiationException e) {
+ throw new IOException(e.toString());
+ } catch (IllegalAccessException e) {
+ throw new IOException(e.toString());
+ }
+ return tokenSelector.selectToken(
+ SecurityUtil.buildTokenService(serverAddr), ugi.getTokens());
+ }
+
+ /**
+ * Get the remote server's principal. The value will be obtained from
+ * the config and cross-checked against the server's advertised principal.
+ *
+ * @param authType of the SASL client
+ * @return String of the server's principal
+ * @throws IOException - error determining configured principal
+ */
+ @VisibleForTesting
+ String getServerPrincipal(SaslAuth authType) throws IOException {
+ KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
+ LOG.debug("Get kerberos info proto:"+protocol+" info:"+krbInfo);
+ if (krbInfo == null) { // protocol has no support for kerberos
+ return null;
+ }
+ String serverKey = krbInfo.serverPrincipal();
+ if (serverKey == null) {
+ throw new IllegalArgumentException(
+ "Can't obtain server Kerberos config key from protocol="
+ + protocol.getCanonicalName());
+ }
+ // construct server advertised principal for comparision
+ String serverPrincipal = new KerberosPrincipal(
+ authType.getProtocol() + "/" + authType.getServerId()).getName();
+ boolean isPrincipalValid = false;
+
+ // use the pattern if defined
+ String serverKeyPattern = conf.get(serverKey + ".pattern");
+ if (serverKeyPattern != null && !serverKeyPattern.isEmpty()) {
+ Pattern pattern = GlobPattern.compile(serverKeyPattern);
+ isPrincipalValid = pattern.matcher(serverPrincipal).matches();
+ } else {
+ // check that the server advertised principal matches our conf
+ String confPrincipal = SecurityUtil.getServerPrincipal(
+ conf.get(serverKey), serverAddr.getAddress());
+ if (confPrincipal == null || confPrincipal.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Failed to specify server's Kerberos principal name");
+ }
+ KerberosName name = new KerberosName(confPrincipal);
+ if (name.getHostName() == null) {
+ throw new IllegalArgumentException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + confPrincipal);
+ }
+ isPrincipalValid = serverPrincipal.equals(confPrincipal);
+ }
+ if (!isPrincipalValid) {
+ throw new IllegalArgumentException(
+ "Server has invalid Kerberos principal: " + serverPrincipal);
}
+ return serverPrincipal;
}
+
/**
* Do client side SASL authentication with server via the given InputStream
@@ -144,18 +337,19 @@ public class SaslRpcClient {
* InputStream to use
* @param outS
* OutputStream to use
- * @return true if connection is set up, or false if needs to switch
- * to simple Auth.
+ * @return AuthMethod used to negotiate the connection
* @throws IOException
*/
- public boolean saslConnect(InputStream inS, OutputStream outS)
+ public AuthMethod saslConnect(InputStream inS, OutputStream outS)
throws IOException {
DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
outS));
- // track if SASL ever started, or server switched us to simple
- boolean inSasl = false;
+ // redefined if/when a SASL negotiation starts, can be queried if the
+ // negotiation fails
+ authMethod = AuthMethod.SIMPLE;
+
sendSaslMessage(outStream, negotiateRequest);
// loop until sasl is complete or a rpc error occurs
@@ -189,50 +383,49 @@ public class SaslRpcClient {
RpcSaslProto.Builder response = null;
switch (saslMessage.getState()) {
case NEGOTIATE: {
- inSasl = true;
- // TODO: should instantiate sasl client based on advertisement
- // but just blindly use the pre-instantiated sasl client for now
- String clientAuthMethod = authMethod.toString();
- SaslAuth saslAuthType = null;
- for (SaslAuth authType : saslMessage.getAuthsList()) {
- if (clientAuthMethod.equals(authType.getMethod())) {
- saslAuthType = authType;
- break;
- }
- }
- if (saslAuthType == null) {
- saslAuthType = SaslAuth.newBuilder()
- .setMethod(clientAuthMethod)
- .setMechanism(saslClient.getMechanismName())
- .build();
- }
+ // create a compatible SASL client, throws if no supported auths
+ SaslAuth saslAuthType = selectSaslClient(saslMessage.getAuthsList());
+ // define auth being attempted, caller can query if connect fails
+ authMethod = AuthMethod.valueOf(saslAuthType.getMethod());
- byte[] challengeToken = null;
- if (saslAuthType != null && saslAuthType.hasChallenge()) {
- // server provided the first challenge
- challengeToken = saslAuthType.getChallenge().toByteArray();
- saslAuthType =
- SaslAuth.newBuilder(saslAuthType).clearChallenge().build();
- } else if (saslClient.hasInitialResponse()) {
- challengeToken = new byte[0];
+ byte[] responseToken = null;
+ if (authMethod == AuthMethod.SIMPLE) { // switching to SIMPLE
+ done = true; // not going to wait for success ack
+ } else {
+ byte[] challengeToken = null;
+ if (saslAuthType.hasChallenge()) {
+ // server provided the first challenge
+ challengeToken = saslAuthType.getChallenge().toByteArray();
+ saslAuthType =
+ SaslAuth.newBuilder(saslAuthType).clearChallenge().build();
+ } else if (saslClient.hasInitialResponse()) {
+ challengeToken = new byte[0];
+ }
+ responseToken = (challengeToken != null)
+ ? saslClient.evaluateChallenge(challengeToken)
+ : new byte[0];
}
- byte[] responseToken = (challengeToken != null)
- ? saslClient.evaluateChallenge(challengeToken)
- : new byte[0];
-
response = createSaslReply(SaslState.INITIATE, responseToken);
response.addAuths(saslAuthType);
break;
}
case CHALLENGE: {
- inSasl = true;
+ if (saslClient == null) {
+ // should probably instantiate a client to allow a server to
+ // demand a specific negotiation
+ throw new SaslException("Server sent unsolicited challenge");
+ }
byte[] responseToken = saslEvaluateToken(saslMessage, false);
response = createSaslReply(SaslState.RESPONSE, responseToken);
break;
}
case SUCCESS: {
- if (inSasl && saslEvaluateToken(saslMessage, true) != null) {
- throw new SaslException("SASL client generated spurious token");
+ // simple server sends immediate success to a SASL client for
+ // switch to simple
+ if (saslClient == null) {
+ authMethod = AuthMethod.SIMPLE;
+ } else {
+ saslEvaluateToken(saslMessage, true);
}
done = true;
break;
@@ -246,12 +439,7 @@ public class SaslRpcClient {
sendSaslMessage(outStream, response.build());
}
} while (!done);
- if (!inSasl && !fallbackAllowed) {
- throw new IOException("Server asks us to fall back to SIMPLE " +
- "auth, but this client is configured to only allow secure " +
- "connections.");
- }
- return inSasl;
+ return authMethod;
}
private void sendSaslMessage(DataOutputStream out, RpcSaslProto message)
@@ -266,17 +454,37 @@ public class SaslRpcClient {
out.flush();
}
+ /**
+ * Evaluate the server provided challenge. The server must send a token
+ * if it's not done. If the server is done, the challenge token is
+ * optional because not all mechanisms send a final token for the client to
+ * update its internal state. The client must also be done after
+ * evaluating the optional token to ensure a malicious server doesn't
+ * prematurely end the negotiation with a phony success.
+ *
+ * @param saslResponse - client response to challenge
+ * @param serverIsDone - server negotiation state
+ * @throws SaslException - any problems with negotiation
+ */
private byte[] saslEvaluateToken(RpcSaslProto saslResponse,
- boolean done) throws SaslException {
+ boolean serverIsDone) throws SaslException {
byte[] saslToken = null;
if (saslResponse.hasToken()) {
saslToken = saslResponse.getToken().toByteArray();
saslToken = saslClient.evaluateChallenge(saslToken);
- } else if (!done) {
- throw new SaslException("Challenge contains no token");
- }
- if (done && !saslClient.isComplete()) {
- throw new SaslException("Client is out of sync with server");
+ } else if (!serverIsDone) {
+ // the server may only omit a token when it's done
+ throw new SaslException("Server challenge contains no token");
+ }
+ if (serverIsDone) {
+ // server tried to report success before our client completed
+ if (!saslClient.isComplete()) {
+ throw new SaslException("Client is out of sync with server");
+ }
+ // a client cannot generate a response to a success message
+ if (saslToken != null) {
+ throw new SaslException("Client generated spurious response");
+ }
}
return saslToken;
}
@@ -291,41 +499,147 @@ public class SaslRpcClient {
return response;
}
+ private boolean useWrap() {
+ // getNegotiatedProperty throws if client isn't complete
+ String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ // SASL wrapping is only used if the connection has a QOP, and
+ // the value is not auth. ex. auth-int & auth-priv
+ return qop != null && !"auth".equalsIgnoreCase(qop);
+ }
+
/**
- * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
- * been called.
+ * Get SASL wrapped InputStream if SASL QoP requires unwrapping,
+ * otherwise return original stream. Can be called only after
+ * saslConnect() has been called.
*
- * @param in
- * the InputStream to wrap
- * @return a SASL wrapped InputStream
+ * @param in - InputStream used to make the connection
+ * @return InputStream that may be using SASL unwrap
* @throws IOException
*/
public InputStream getInputStream(InputStream in) throws IOException {
- if (!saslClient.isComplete()) {
- throw new IOException("Sasl authentication exchange hasn't completed yet");
+ if (useWrap()) {
+ in = new WrappedInputStream(in);
}
- return new SaslInputStream(in, saslClient);
+ return in;
}
/**
- * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
- * been called.
+ * Get SASL wrapped OutputStream if SASL QoP requires wrapping,
+ * otherwise return original stream. Can be called only after
+ * saslConnect() has been called.
*
- * @param out
- * the OutputStream to wrap
- * @return a SASL wrapped OutputStream
+ * @param in - InputStream used to make the connection
+ * @return InputStream that may be using SASL unwrap
* @throws IOException
*/
public OutputStream getOutputStream(OutputStream out) throws IOException {
- if (!saslClient.isComplete()) {
- throw new IOException("Sasl authentication exchange hasn't completed yet");
+ if (useWrap()) {
+ // the client and server negotiate a maximum buffer size that can be
+ // wrapped
+ String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE);
+ out = new BufferedOutputStream(new WrappedOutputStream(out),
+ Integer.parseInt(maxBuf));
+ }
+ return out;
+ }
+
+ // ideally this should be folded into the RPC decoding loop but it's
+ // currently split across Client and SaslRpcClient...
+ class WrappedInputStream extends FilterInputStream {
+ private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
+ public WrappedInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int n = read(b, 0, 1);
+ return (n != -1) ? b[0] : -1;
+ }
+
+ @Override
+ public int read(byte b[]) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ synchronized(unwrappedRpcBuffer) {
+ // fill the buffer with the next RPC message
+ if (unwrappedRpcBuffer.remaining() == 0) {
+ readNextRpcPacket();
+ }
+ // satisfy as much of the request as possible
+ int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
+ unwrappedRpcBuffer.get(buf, off, readLen);
+ return readLen;
+ }
+ }
+
+ // all messages must be RPC SASL wrapped, else an exception is thrown
+ private void readNextRpcPacket() throws IOException {
+ LOG.debug("reading next wrapped RPC packet");
+ DataInputStream dis = new DataInputStream(in);
+ int rpcLen = dis.readInt();
+ byte[] rpcBuf = new byte[rpcLen];
+ dis.readFully(rpcBuf);
+
+ // decode the RPC header
+ ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
+ RpcResponseHeaderProto.Builder headerBuilder =
+ RpcResponseHeaderProto.newBuilder();
+ headerBuilder.mergeDelimitedFrom(bis);
+
+ boolean isWrapped = false;
+ // Must be SASL wrapped, verify and decode.
+ if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
+ RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder();
+ saslMessage.mergeDelimitedFrom(bis);
+ if (saslMessage.getState() == SaslState.WRAP) {
+ isWrapped = true;
+ byte[] token = saslMessage.getToken().toByteArray();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("unwrapping token of length:" + token.length);
+ }
+ token = saslClient.unwrap(token, 0, token.length);
+ unwrappedRpcBuffer = ByteBuffer.wrap(token);
+ }
+ }
+ if (!isWrapped) {
+ throw new SaslException("Server sent non-wrapped response");
+ }
}
- return new SaslOutputStream(out, saslClient);
}
+ class WrappedOutputStream extends FilterOutputStream {
+ public WrappedOutputStream(OutputStream out) throws IOException {
+ super(out);
+ }
+ @Override
+ public void write(byte[] buf, int off, int len) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("wrapping token of length:" + len);
+ }
+ buf = saslClient.wrap(buf, off, len);
+ RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
+ .setState(SaslState.WRAP)
+ .setToken(ByteString.copyFrom(buf, 0, buf.length))
+ .build();
+ RpcRequestMessageWrapper request =
+ new RpcRequestMessageWrapper(saslHeader, saslMessage);
+ DataOutputStream dob = new DataOutputStream(out);
+ dob.writeInt(request.getLength());
+ request.write(dob);
+ }
+ }
+
/** Release resources used by wrapped saslClient */
public void dispose() throws SaslException {
- saslClient.dispose();
+ if (saslClient != null) {
+ saslClient.dispose();
+ saslClient = null;
+ }
}
private static class SaslClientCallbackHandler implements CallbackHandler {
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java Mon Aug 12 21:25:49 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -104,12 +103,12 @@ public class SaslRpcServer {
String fullName = UserGroupInformation.getCurrentUser().getUserName();
if (LOG.isDebugEnabled())
LOG.debug("Kerberos principal name is " + fullName);
- KerberosName krbName = new KerberosName(fullName);
- serverId = krbName.getHostName();
- if (serverId == null) {
- serverId = "";
- }
- protocol = krbName.getServiceName();
+ // don't use KerberosName because we don't want auth_to_local
+ String[] parts = fullName.split("[/@]", 2);
+ protocol = parts[0];
+ // should verify service host is present here rather than in create()
+ // but lazy tests are using a UGI that isn't a SPN...
+ serverId = (parts.length < 2) ? "" : parts[1];
break;
}
default:
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java Mon Aug 12 21:25:49 2013
@@ -217,7 +217,7 @@ public class SecurityUtil {
private static String replacePattern(String[] components, String hostname)
throws IOException {
String fqdn = hostname;
- if (fqdn == null || fqdn.equals("") || fqdn.equals("0.0.0.0")) {
+ if (fqdn == null || fqdn.isEmpty() || fqdn.equals("0.0.0.0")) {
fqdn = getLocalHostName();
}
return components[0] + "/" + fqdn.toLowerCase(Locale.US) + "@" + components[2];
@@ -672,7 +672,7 @@ public class SecurityUtil {
public static AuthenticationMethod getAuthenticationMethod(Configuration conf) {
String value = conf.get(HADOOP_SECURITY_AUTHENTICATION, "simple");
try {
- return Enum.valueOf(AuthenticationMethod.class, value.toUpperCase());
+ return Enum.valueOf(AuthenticationMethod.class, value.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException("Invalid attribute value for " +
HADOOP_SECURITY_AUTHENTICATION + " of " + value);
@@ -685,6 +685,6 @@ public class SecurityUtil {
authenticationMethod = AuthenticationMethod.SIMPLE;
}
conf.set(HADOOP_SECURITY_AUTHENTICATION,
- authenticationMethod.toString().toLowerCase());
+ authenticationMethod.toString().toLowerCase(Locale.ENGLISH));
}
}