You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2010/02/03 02:30:29 UTC
svn commit: r905860 [1/2] - in /hadoop/common/trunk: ./
src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/security/
src/java/org/apache/hadoop/security/token/ src/test/
src/test/core/org/apache/hadoop/ipc/
Author: ddas
Date: Wed Feb 3 01:30:25 2010
New Revision: 905860
URL: http://svn.apache.org/viewvc?rev=905860&view=rev
Log:
HADOOP-6419. Adds SASL based authentication to RPC. Contributed by Kan Zhang.
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/security/KerberosInfo.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslInputStream.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslOutputStream.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcClient.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcServer.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenInfo.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenSelector.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/SecretManager.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenIdentifier.java
hadoop/common/trunk/src/test/core-site.xml
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Wed Feb 3 01:30:25 2010
@@ -48,6 +48,9 @@
upon login. The tokens are read from a file specified in the
environment variable. (ddas)
+ HADOOP-6419. Adds SASL based authentication to RPC.
+ (Kan Zhang via ddas)
+
IMPROVEMENTS
HADOOP-6283. Improve the exception messages thrown by
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Wed Feb 3 01:30:25 2010
@@ -33,6 +33,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.net.NetUtils;
import org.apache.avro.*;
@@ -192,10 +194,13 @@
* port and address. */
public RPC.Server getServer(Class iface, Object impl, String bindAddress,
int port, int numHandlers, boolean verbose,
- Configuration conf) throws IOException {
+ Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager
+ ) throws IOException {
return ENGINE.getServer(TunnelProtocol.class,
new TunnelResponder(iface, impl),
- bindAddress, port, numHandlers, verbose, conf);
+ bindAddress, port, numHandlers, verbose, conf,
+ secretManager);
}
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Feb 3 01:30:25 2010
@@ -31,7 +31,9 @@
import java.io.BufferedOutputStream;
import java.io.FilterInputStream;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.PrivilegedExceptionAction;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -44,11 +46,19 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.util.ReflectionUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
@@ -196,8 +206,13 @@
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private InetSocketAddress server; // server ip:port
+ private String serverPrincipal; // server's krb5 principal name
private ConnectionHeader header; // connection header
- private ConnectionId remoteId; // connection id
+ private final ConnectionId remoteId; // connection id
+ private final AuthMethod authMethod; // authentication method
+ private final boolean useSasl;
+ private Token<? extends TokenIdentifier> token;
+ private SaslRpcClient saslRpcClient;
private Socket socket = null; // connected socket
private DataInputStream in;
@@ -221,6 +236,42 @@
Class<?> protocol = remoteId.getProtocol();
header =
new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket);
+ this.useSasl = UserGroupInformation.isSecurityEnabled();
+ if (useSasl && protocol != null) {
+ TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class);
+ if (tokenInfo != null) {
+ TokenSelector<? extends TokenIdentifier> tokenSelector = null;
+ try {
+ tokenSelector = tokenInfo.value().newInstance();
+ } catch (InstantiationException e) {
+ throw new IOException(e.toString());
+ } catch (IllegalAccessException e) {
+ throw new IOException(e.toString());
+ }
+ InetSocketAddress addr = remoteId.getAddress();
+ token = tokenSelector.selectToken(new Text(addr.getAddress()
+ .getHostAddress() + ":" + addr.getPort()),
+ ticket.getTokens());
+ }
+ KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
+ if (krbInfo != null) {
+ String serverKey = krbInfo.value();
+ if (serverKey != null) {
+ serverPrincipal = conf.get(serverKey);
+ }
+ }
+ }
+
+ if (!useSasl) {
+ authMethod = AuthMethod.SIMPLE;
+ } else if (token != null) {
+ authMethod = AuthMethod.DIGEST;
+ } else {
+ authMethod = AuthMethod.KERBEROS;
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Use " + authMethod + " authentication for protocol "
+ + protocol.getSimpleName());
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
@@ -302,11 +353,20 @@
}
}
+ private synchronized void disposeSasl() {
+ if (saslRpcClient != null) {
+ try {
+ saslRpcClient.dispose();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
- private synchronized void setupIOstreams() {
+ private synchronized void setupIOstreams() throws InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}
@@ -334,15 +394,33 @@
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
+ InputStream inStream = NetUtils.getInputStream(socket);
+ OutputStream outStream = NetUtils.getOutputStream(socket);
+ writeRpcHeader(outStream);
+ if (useSasl) {
+ final InputStream in2 = inStream;
+ final OutputStream out2 = outStream;
+ remoteId.getTicket().doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws IOException {
+ saslRpcClient = new SaslRpcClient(authMethod, token,
+ serverPrincipal);
+ saslRpcClient.saslConnect(in2, out2);
+ return null;
+ }
+ });
+ inStream = saslRpcClient.getInputStream(inStream);
+ outStream = saslRpcClient.getOutputStream(outStream);
+ }
if (doPing) {
this.in = new DataInputStream(new BufferedInputStream
- (new PingInputStream(NetUtils.getInputStream(socket))));
+ (new PingInputStream(inStream)));
} else {
this.in = new DataInputStream(new BufferedInputStream
- (NetUtils.getInputStream(socket)));
+ (inStream));
}
this.out = new DataOutputStream
- (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+ (new BufferedOutputStream(outStream));
writeHeader();
// update last activity time
@@ -396,14 +474,20 @@
". Already tried " + curRetries + " time(s).");
}
- /* Write the header for each connection
+ /* Write the RPC header */
+ private void writeRpcHeader(OutputStream outStream) throws IOException {
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
+ // Write out the header, version and authentication method
+ out.write(Server.HEADER.array());
+ out.write(Server.CURRENT_VERSION);
+ authMethod.write(out);
+ out.flush();
+ }
+
+ /* Write the protocol header for each connection
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
- // Write out the header and version
- out.write(Server.HEADER.array());
- out.write(Server.CURRENT_VERSION);
-
// Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer();
header.write(buf);
@@ -575,6 +659,7 @@
// close the streams and therefore the socket
IOUtils.closeStream(out);
IOUtils.closeStream(in);
+ disposeSasl();
// clean up all calls
if (closeException == null) {
@@ -815,7 +900,7 @@
*/
@Deprecated
public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
- throws IOException {
+ throws IOException, InterruptedException {
return call(params, addresses, null, null);
}
@@ -825,7 +910,7 @@
* contains nulls for calls that timed out or errored. */
public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
Class<?> protocol, UserGroupInformation ticket)
- throws IOException {
+ throws IOException, InterruptedException {
if (addresses.length == 0) return new Writable[0];
ParallelResults results = new ParallelResults(params.length);
@@ -859,7 +944,7 @@
Class<?> protocol,
UserGroupInformation ticket,
Call call)
- throws IOException {
+ throws IOException, InterruptedException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed Feb 3 01:30:25 2010
@@ -37,6 +37,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.util.ReflectionUtils;
@@ -254,7 +256,7 @@
@Deprecated
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, Configuration conf)
- throws IOException {
+ throws IOException, InterruptedException {
return call(method, params, addrs, null, conf);
}
@@ -262,7 +264,7 @@
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
- throws IOException {
+ throws IOException, InterruptedException {
return getProtocolEngine(method.getDeclaringClass(), conf)
.call(method, params, addrs, ticket, conf);
@@ -288,7 +290,7 @@
final boolean verbose, Configuration conf)
throws IOException {
return getServer(instance.getClass(), // use impl class for protocol
- instance, bindAddress, port, numHandlers, false, conf);
+ instance, bindAddress, port, numHandlers, false, conf, null);
}
/** Construct a server for a protocol implementation instance. */
@@ -296,19 +298,34 @@
Object instance, String bindAddress,
int port, Configuration conf)
throws IOException {
- return getServer(protocol, instance, bindAddress, port, 1, false, conf);
+ return getServer(protocol, instance, bindAddress, port, 1, false, conf, null);
}
- /** Construct a server for a protocol implementation instance. */
+ /** Construct a server for a protocol implementation instance.
+ * @deprecated secretManager should be passed.
+ */
+ @Deprecated
public static Server getServer(Class protocol,
Object instance, String bindAddress, int port,
int numHandlers,
boolean verbose, Configuration conf)
throws IOException {
+ return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
+ conf, null);
+ }
+
+ /** Construct a server for a protocol implementation instance. */
+ public static Server getServer(Class<?> protocol,
+ Object instance, String bindAddress, int port,
+ int numHandlers,
+ boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
- conf);
+ conf, secretManager);
}
/** An RPC Server. */
@@ -316,8 +333,9 @@
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
- Configuration conf, String serverName) throws IOException {
- super(bindAddress, port, paramClass, handlerCount, conf, serverName);
+ Configuration conf, String serverName,
+ SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+ super(bindAddress, port, paramClass, handlerCount, conf, serverName, secretManager);
}
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Wed Feb 3 01:30:25 2010
@@ -24,6 +24,8 @@
import javax.net.SocketFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.Configuration;
/** An RPC implementation. */
@@ -41,11 +43,13 @@
/** Expert: Make multiple, parallel calls to a set of servers. */
Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
- throws IOException;
+ throws IOException, InterruptedException;
/** Construct a server for a protocol implementation instance. */
RPC.Server getServer(Class protocol, Object instance, String bindAddress,
int port, int numHandlers, boolean verbose,
- Configuration conf) throws IOException;
+ Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager
+ ) throws IOException;
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Feb 3 01:30:25 2010
@@ -32,6 +32,7 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
@@ -39,7 +40,6 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
-import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
@@ -52,15 +52,26 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
+import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -80,7 +91,8 @@
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
- public static final byte CURRENT_VERSION = 3;
+ // 4 : Introduced SASL security layer
+ public static final byte CURRENT_VERSION = 4;
/**
* How many calls/handler are allowed in the queue.
@@ -158,6 +170,7 @@
protected RpcMetrics rpcMetrics;
private Configuration conf;
+ private SecretManager<TokenIdentifier> secretManager;
private int maxQueueSize;
private int socketSendBufferSize;
@@ -431,7 +444,7 @@
if (count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " +
- c.getHostAddress() + ". Number of active connections: "+
+ c + ". Number of active connections: "+
numConnections);
closeConnection(c);
c = null;
@@ -703,8 +716,7 @@
/** Reads calls from a connection and queues them for handling. */
private class Connection {
- private boolean versionRead = false; //if initial signature and
- //version are read
+ private boolean rpcHeaderRead = false; // if initial rpc header is read
private boolean headerRead = false; //if the connection header that
//follows version is read.
@@ -723,6 +735,13 @@
ConnectionHeader header = new ConnectionHeader();
Class<?> protocol;
+ boolean useSasl;
+ SaslServer saslServer;
+ private AuthMethod authMethod;
+ private boolean saslContextEstablished;
+ private ByteBuffer rpcHeaderBuffer;
+ private ByteBuffer unwrappedData;
+ private ByteBuffer unwrappedDataLengthBuffer;
UserGroupInformation user = null;
@@ -731,6 +750,10 @@
private final Call authFailedCall =
new Call(AUTHROIZATION_FAILED_CALLID, null, null);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
+ // Fake 'call' for SASL context setup
+ private static final int SASL_CALLID = -33;
+ private final Call saslCall = new Call(SASL_CALLID, null, null);
+ private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
public Connection(SelectionKey key, SocketChannel channel,
long lastContact) {
@@ -738,6 +761,8 @@
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
+ this.unwrappedData = null;
+ this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
InetAddress addr = socket.getInetAddress();
if (addr == null) {
@@ -795,6 +820,92 @@
return false;
}
+ private void saslReadAndProcess(byte[] saslToken) throws IOException,
+ InterruptedException {
+ if (!saslContextEstablished) {
+ if (saslServer == null) {
+ switch (authMethod) {
+ case DIGEST:
+ saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
+ .getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
+ secretManager));
+ break;
+ default:
+ UserGroupInformation current = UserGroupInformation
+ .getCurrentUser();
+ String fullName = current.getUserName();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Kerberos principal name is " + fullName);
+ final String names[] = SaslRpcServer.splitKerberosName(fullName);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected "
+ + "hostname part: " + fullName);
+ }
+ current.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws IOException {
+ saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
+ .getMechanismName(), names[0], names[1],
+ SaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
+ return null;
+ }
+ });
+ }
+ if (saslServer == null)
+ throw new IOException(
+ "Unable to find SASL server implementation for "
+ + authMethod.getMechanismName());
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created SASL server with mechanism = "
+ + authMethod.getMechanismName());
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have read input token of size " + saslToken.length
+ + " for processing by saslServer.evaluateResponse()");
+ byte[] replyToken = saslServer.evaluateResponse(saslToken);
+ if (replyToken != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will send token of size " + replyToken.length
+ + " from saslServer.");
+ saslCall.connection = this;
+ saslResponse.reset();
+ DataOutputStream out = new DataOutputStream(saslResponse);
+ out.writeInt(replyToken.length);
+ out.write(replyToken, 0, replyToken.length);
+ saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
+ responder.doRespond(saslCall);
+ }
+ if (saslServer.isComplete()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server context established. Negotiated QoP is "
+ + saslServer.getNegotiatedProperty(Sasl.QOP));
+ }
+ user = UserGroupInformation.createRemoteUser(saslServer
+ .getAuthorizationID());
+ LOG.info("SASL server successfully authenticated client: " + user);
+ saslContextEstablished = true;
+ }
+ } else {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have read input token of size " + saslToken.length
+ + " for processing by saslServer.unwrap()");
+ byte[] plaintextData = saslServer
+ .unwrap(saslToken, 0, saslToken.length);
+ processUnwrappedData(plaintextData);
+ }
+ }
+
+ private void disposeSasl() {
+ if (saslServer != null) {
+ try {
+ saslServer.dispose();
+ } catch (SaslException ignored) {
+ }
+ }
+ }
+
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
@@ -807,14 +918,33 @@
return count;
}
- if (!versionRead) {
+ if (!rpcHeaderRead) {
//Every connection is expected to send the header.
- ByteBuffer versionBuffer = ByteBuffer.allocate(1);
- count = channelRead(channel, versionBuffer);
- if (count <= 0) {
+ if (rpcHeaderBuffer == null) {
+ rpcHeaderBuffer = ByteBuffer.allocate(2);
+ }
+ count = channelRead(channel, rpcHeaderBuffer);
+ if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
- int version = versionBuffer.get(0);
+ int version = rpcHeaderBuffer.get(0);
+ byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
+ authMethod = AuthMethod.read(new DataInputStream(
+ new ByteArrayInputStream(method)));
+ if (authMethod == null) {
+ throw new IOException("Unable to read authentication method");
+ }
+ if (UserGroupInformation.isSecurityEnabled()
+ && authMethod == AuthMethod.SIMPLE) {
+ throw new IOException("Authentication is required");
+ }
+ if (!UserGroupInformation.isSecurityEnabled()
+ && authMethod != AuthMethod.SIMPLE) {
+ throw new IOException("Authentication is not supported");
+ }
+ if (authMethod != AuthMethod.SIMPLE) {
+ useSasl = true;
+ }
dataLengthBuffer.flip();
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
@@ -826,7 +956,8 @@
return -1;
}
dataLengthBuffer.clear();
- versionRead = true;
+ rpcHeaderBuffer = null;
+ rpcHeaderRead = true;
continue;
}
@@ -834,12 +965,11 @@
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
- if (dataLength == Client.PING_CALL_ID) {
+ if (!useSasl && dataLength == Client.PING_CALL_ID) {
dataLengthBuffer.clear();
return 0; //ping message
}
data = ByteBuffer.allocate(dataLength);
- incRpcCount(); // Increment the rpc count
}
count = channelRead(channel, data);
@@ -847,33 +977,14 @@
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
- if (headerRead) {
- processData();
- data = null;
- return count;
+ boolean isHeaderRead = headerRead;
+ if (useSasl) {
+ saslReadAndProcess(data.array());
} else {
- processHeader();
- headerRead = true;
- data = null;
-
- // Authorize the connection
- try {
- authorize(user, header);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully authorized " + header);
- }
- } catch (AuthorizationException ae) {
- authFailedCall.connection = this;
- setupResponse(authFailedResponse, authFailedCall,
- Status.FATAL, null,
- ae.getClass().getName(), ae.getMessage());
- responder.doRespond(authFailedCall);
-
- // Close this connection
- return -1;
- }
-
+ processOneRpc(data.array());
+ }
+ data = null;
+ if (!isHeaderRead) {
continue;
}
}
@@ -882,9 +993,9 @@
}
/// Reads the connection header following version
- private void processHeader() throws IOException {
+ private void processHeader(byte[] buf) throws IOException {
DataInputStream in =
- new DataInputStream(new ByteArrayInputStream(data.array()));
+ new DataInputStream(new ByteArrayInputStream(buf));
header.readFields(in);
try {
String protocolClassName = header.getProtocol();
@@ -895,12 +1006,73 @@
throw new IOException("Unknown protocol: " + header.getProtocol());
}
- user = header.getUgi();
+ UserGroupInformation protocolUser = header.getUgi();
+ if (!useSasl) {
+ user = protocolUser;
+ } else if (protocolUser != null && !protocolUser.equals(user)) {
+ throw new AccessControlException("Authenticated user (" + user
+ + ") doesn't match what the client claims to be (" + protocolUser
+ + ")");
+ }
}
- private void processData() throws IOException, InterruptedException {
+ private void processUnwrappedData(byte[] inBuf) throws IOException,
+ InterruptedException {
+ ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
+ inBuf));
+ // Read all RPCs contained in the inBuf, even partial ones
+ while (true) {
+ int count = -1;
+ if (unwrappedDataLengthBuffer.remaining() > 0) {
+ count = channelRead(ch, unwrappedDataLengthBuffer);
+ if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
+ return;
+ }
+
+ if (unwrappedData == null) {
+ unwrappedDataLengthBuffer.flip();
+ int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+ if (unwrappedDataLength == Client.PING_CALL_ID) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received ping message");
+ unwrappedDataLengthBuffer.clear();
+ continue; // ping message
+ }
+ unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+ }
+
+ count = channelRead(ch, unwrappedData);
+ if (count <= 0 || unwrappedData.remaining() > 0)
+ return;
+
+ if (unwrappedData.remaining() == 0) {
+ unwrappedDataLengthBuffer.clear();
+ unwrappedData.flip();
+ processOneRpc(unwrappedData.array());
+ unwrappedData = null;
+ }
+ }
+ }
+
+ private void processOneRpc(byte[] buf) throws IOException,
+ InterruptedException {
+ if (headerRead) {
+ processData(buf);
+ } else {
+ processHeader(buf);
+ headerRead = true;
+ if (!authorizeConnection()) {
+ throw new AccessControlException("Connection from " + this
+ + " for protocol " + header.getProtocol()
+ + " is unauthorized for user " + user);
+ }
+ }
+ }
+
+ private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(data.array()));
+ new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
if (LOG.isDebugEnabled())
@@ -911,9 +1083,27 @@
Call call = new Call(id, param, this);
callQueue.put(call); // queue the call; maybe blocked here
+ incRpcCount(); // Increment the rpc count
}
+ private boolean authorizeConnection() throws IOException {
+ try {
+ authorize(user, header);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully authorized " + header);
+ }
+ } catch (AuthorizationException ae) {
+ authFailedCall.connection = this;
+ setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+ ae.getClass().getName(), ae.getMessage());
+ responder.doRespond(authFailedCall);
+ return false;
+ }
+ return true;
+ }
+
private synchronized void close() throws IOException {
+ disposeSasl();
data = null;
dataLengthBuffer = null;
if (!channel.isOpen())
@@ -1011,16 +1201,17 @@
Configuration conf)
throws IOException
{
- this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port));
+ this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port), null);
}
/** Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
*
*/
+ @SuppressWarnings("unchecked")
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
- Configuration conf, String serverName)
+ Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
@@ -1033,6 +1224,7 @@
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+ this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
false);
@@ -1086,9 +1278,29 @@
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
+ wrapWithSasl(response, call);
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
+ private void wrapWithSasl(ByteArrayOutputStream response, Call call)
+ throws IOException {
+ if (call.connection.useSasl) {
+ byte[] token = response.toByteArray();
+ // synchronization may be needed since there can be multiple Handler
+ // threads using saslServer to wrap responses.
+ synchronized (call.connection.saslServer) {
+ token = call.connection.saslServer.wrap(token, 0, token.length);
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Adding saslServer wrapped token of size " + token.length
+ + " as call response.");
+ response.reset();
+ DataOutputStream saslOut = new DataOutputStream(response);
+ saslOut.writeInt(token.length);
+ saslOut.write(token, 0, token.length);
+ }
+ }
+
Configuration getConf() {
return conf;
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Wed Feb 3 01:30:25 2010
@@ -36,6 +36,8 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
@@ -246,7 +248,7 @@
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
- throws IOException {
+ throws IOException, InterruptedException {
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
@@ -276,9 +278,11 @@
* port and address. */
public Server getServer(Class protocol,
Object instance, String bindAddress, int port,
- int numHandlers, boolean verbose, Configuration conf)
+ int numHandlers, boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+ return new Server(instance, conf, bindAddress, port, numHandlers,
+ verbose, secretManager);
}
/** An RPC Server. */
@@ -294,7 +298,7 @@
*/
public Server(Object instance, Configuration conf, String bindAddress, int port)
throws IOException {
- this(instance, conf, bindAddress, port, 1, false);
+ this(instance, conf, bindAddress, port, 1, false, null);
}
private static String classNameBase(String className) {
@@ -314,8 +318,11 @@
* @param verbose whether each call should be logged
*/
public Server(Object instance, Configuration conf, String bindAddress, int port,
- int numHandlers, boolean verbose) throws IOException {
- super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
+ int numHandlers, boolean verbose,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ super(bindAddress, port, Invocation.class, numHandlers, conf,
+ classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;
}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/security/KerberosInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/KerberosInfo.java?rev=905860&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/KerberosInfo.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/KerberosInfo.java Wed Feb 3 01:30:25 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.lang.annotation.*;
+
+/**
+ * Indicates Kerberos related information to be used
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface KerberosInfo {
+ /** Key for getting server's Kerberos principal name from Configuration */
+ String value();
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslInputStream.java?rev=905860&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslInputStream.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslInputStream.java Wed Feb 3 01:30:25 2010
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.InputStream;
+import java.io.IOException;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A SaslInputStream is composed of an InputStream and a SaslServer (or
+ * SaslClient) so that read() methods return data that are read in from the
+ * underlying InputStream but have been additionally processed by the SaslServer
+ * (or SaslClient) object. The SaslServer (or SaslClient) object must be fully
+ * initialized before being used by a SaslInputStream.
+ */
+public class SaslInputStream extends InputStream {
+ public static final Log LOG = LogFactory.getLog(SaslInputStream.class);
+
+ private final DataInputStream inStream;
+ /*
+ * data read from the underlying input stream before being processed by SASL
+ */
+ private byte[] saslToken;
+ private final SaslClient saslClient;
+ private final SaslServer saslServer;
+ private byte[] lengthBuf = new byte[4];
+ /*
+ * buffer holding data that have been processed by SASL, but have not been
+ * read out
+ */
+ private byte[] obuffer;
+ // position of the next "new" byte
+ private int ostart = 0;
+ // position of the last "new" byte
+ private int ofinish = 0;
+
+ private static int unsignedBytesToInt(byte[] buf) {
+ if (buf.length != 4) {
+ throw new IllegalArgumentException(
+ "Cannot handle byte array other than 4 bytes");
+ }
+ int result = 0;
+ for (int i = 0; i < 4; i++) {
+ result <<= 8;
+ result |= ((int) buf[i] & 0xff);
+ }
+ return result;
+ }
+
+ /**
+ * Read more data and get them processed <br>
+ * Entry condition: ostart = ofinish <br>
+ * Exit condition: ostart <= ofinish <br>
+ *
+ * return (ofinish-ostart) (we have this many bytes for you), 0 (no data now,
+ * but could have more later), or -1 (absolutely no more data)
+ */
+ private int readMoreData() throws IOException {
+ try {
+ inStream.readFully(lengthBuf);
+ int length = unsignedBytesToInt(lengthBuf);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Actual length is " + length);
+ saslToken = new byte[length];
+ inStream.readFully(saslToken);
+ } catch (EOFException e) {
+ return -1;
+ }
+ try {
+ if (saslServer != null) { // using saslServer
+ obuffer = saslServer.unwrap(saslToken, 0, saslToken.length);
+ } else { // using saslClient
+ obuffer = saslClient.unwrap(saslToken, 0, saslToken.length);
+ }
+ } catch (SaslException se) {
+ try {
+ disposeSasl();
+ } catch (SaslException ignored) {
+ }
+ throw se;
+ }
+ ostart = 0;
+ if (obuffer == null)
+ ofinish = 0;
+ else
+ ofinish = obuffer.length;
+ return ofinish;
+ }
+
+ /**
+ * Disposes of any system resources or security-sensitive information Sasl
+ * might be using.
+ *
+ * @exception SaslException
+ * if a SASL error occurs.
+ */
+ private void disposeSasl() throws SaslException {
+ if (saslClient != null) {
+ saslClient.dispose();
+ }
+ if (saslServer != null) {
+ saslServer.dispose();
+ }
+ }
+
+ /**
+ * Constructs a SASLInputStream from an InputStream and a SaslServer <br>
+ * Note: if the specified InputStream or SaslServer is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param inStream
+ * the InputStream to be processed
+ * @param saslServer
+ * an initialized SaslServer object
+ */
+ public SaslInputStream(InputStream inStream, SaslServer saslServer) {
+ this.inStream = new DataInputStream(inStream);
+ this.saslServer = saslServer;
+ this.saslClient = null;
+ }
+
+ /**
+ * Constructs a SASLInputStream from an InputStream and a SaslClient <br>
+ * Note: if the specified InputStream or SaslClient is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param inStream
+ * the InputStream to be processed
+ * @param saslClient
+ * an initialized SaslClient object
+ */
+ public SaslInputStream(InputStream inStream, SaslClient saslClient) {
+ this.inStream = new DataInputStream(inStream);
+ this.saslServer = null;
+ this.saslClient = saslClient;
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is
+ * returned as an <code>int</code> in the range <code>0</code> to
+ * <code>255</code>. If no byte is available because the end of the stream has
+ * been reached, the value <code>-1</code> is returned. This method blocks
+ * until input data is available, the end of the stream is detected, or an
+ * exception is thrown.
+ * <p>
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the stream
+ * is reached.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int read() throws IOException {
+ if (ostart >= ofinish) {
+ // we loop for new data as we are blocking
+ int i = 0;
+ while (i == 0)
+ i = readMoreData();
+ if (i == -1)
+ return -1;
+ }
+ return ((int) obuffer[ostart++] & 0xff);
+ }
+
+ /**
+ * Reads up to <code>b.length</code> bytes of data from this input stream into
+ * an array of bytes.
+ * <p>
+ * The <code>read</code> method of <code>InputStream</code> calls the
+ * <code>read</code> method of three arguments with the arguments
+ * <code>b</code>, <code>0</code>, and <code>b.length</code>.
+ *
+ * @param b
+ * the buffer into which the data is read.
+ * @return the total number of bytes read into the buffer, or <code>-1</code>
+ * is there is no more data because the end of the stream has been
+ * reached.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /**
+ * Reads up to <code>len</code> bytes of data from this input stream into an
+ * array of bytes. This method blocks until some input is available. If the
+ * first argument is <code>null,</code> up to <code>len</code> bytes are read
+ * and discarded.
+ *
+ * @param b
+ * the buffer into which the data is read.
+ * @param off
+ * the start offset of the data.
+ * @param len
+ * the maximum number of bytes read.
+ * @return the total number of bytes read into the buffer, or <code>-1</code>
+ * if there is no more data because the end of the stream has been
+ * reached.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (ostart >= ofinish) {
+ // we loop for new data as we are blocking
+ int i = 0;
+ while (i == 0)
+ i = readMoreData();
+ if (i == -1)
+ return -1;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ int available = ofinish - ostart;
+ if (len < available)
+ available = len;
+ if (b != null) {
+ System.arraycopy(obuffer, ostart, b, off, available);
+ }
+ ostart = ostart + available;
+ return available;
+ }
+
+ /**
+ * Skips <code>n</code> bytes of input from the bytes that can be read from
+ * this input stream without blocking.
+ *
+ * <p>
+ * Fewer bytes than requested might be skipped. The actual number of bytes
+ * skipped is equal to <code>n</code> or the result of a call to
+ * {@link #available() <code>available</code>}, whichever is smaller. If
+ * <code>n</code> is less than zero, no bytes are skipped.
+ *
+ * <p>
+ * The actual number of bytes skipped is returned.
+ *
+ * @param n
+ * the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public long skip(long n) throws IOException {
+ int available = ofinish - ostart;
+ if (n > available) {
+ n = available;
+ }
+ if (n < 0) {
+ return 0;
+ }
+ ostart += n;
+ return n;
+ }
+
+ /**
+ * Returns the number of bytes that can be read from this input stream without
+ * blocking. The <code>available</code> method of <code>InputStream</code>
+ * returns <code>0</code>. This method <B>should</B> be overridden by
+ * subclasses.
+ *
+ * @return the number of bytes that can be read from this input stream without
+ * blocking.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int available() throws IOException {
+ return (ofinish - ostart);
+ }
+
+ /**
+ * Closes this input stream and releases any system resources associated with
+ * the stream.
+ * <p>
+ * The <code>close</code> method of <code>SASLInputStream</code> calls the
+ * <code>close</code> method of its underlying input stream.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void close() throws IOException {
+ disposeSasl();
+ ostart = 0;
+ ofinish = 0;
+ inStream.close();
+ }
+
+ /**
+ * Tests if this input stream supports the <code>mark</code> and
+ * <code>reset</code> methods, which it does not.
+ *
+ * @return <code>false</code>, since this class does not support the
+ * <code>mark</code> and <code>reset</code> methods.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslOutputStream.java?rev=905860&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslOutputStream.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslOutputStream.java Wed Feb 3 01:30:25 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+/**
+ * A SaslOutputStream is composed of an OutputStream and a SaslServer (or
+ * SaslClient) so that write() methods first process the data before writing
+ * them out to the underlying OutputStream. The SaslServer (or SaslClient)
+ * object must be fully initialized before being used by a SaslOutputStream.
+ */
+public class SaslOutputStream extends OutputStream {
+
+ private final DataOutputStream outStream;
+ // processed data ready to be written out
+ private byte[] saslToken;
+
+ private final SaslClient saslClient;
+ private final SaslServer saslServer;
+ // buffer holding one byte of incoming data
+ private final byte[] ibuffer = new byte[1];
+
+ /**
+ * Constructs a SASLOutputStream from an OutputStream and a SaslServer <br>
+ * Note: if the specified OutputStream or SaslServer is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param outStream
+ * the OutputStream to be processed
+ * @param saslServer
+ * an initialized SaslServer object
+ */
+ public SaslOutputStream(OutputStream outStream, SaslServer saslServer) {
+ this.outStream = new DataOutputStream(outStream);
+ this.saslServer = saslServer;
+ this.saslClient = null;
+ }
+
+ /**
+ * Constructs a SASLOutputStream from an OutputStream and a SaslClient <br>
+ * Note: if the specified OutputStream or SaslClient is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param outStream
+ * the OutputStream to be processed
+ * @param saslClient
+ * an initialized SaslClient object
+ */
+ public SaslOutputStream(OutputStream outStream, SaslClient saslClient) {
+ this.outStream = new DataOutputStream(outStream);
+ this.saslServer = null;
+ this.saslClient = saslClient;
+ }
+
+ /**
+ * Disposes of any system resources or security-sensitive information Sasl
+ * might be using.
+ *
+ * @exception SaslException
+ * if a SASL error occurs.
+ */
+ private void disposeSasl() throws SaslException {
+ if (saslClient != null) {
+ saslClient.dispose();
+ }
+ if (saslServer != null) {
+ saslServer.dispose();
+ }
+ }
+
+ /**
+ * Writes the specified byte to this output stream.
+ *
+ * @param b
+ * the <code>byte</code>.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(int b) throws IOException {
+ ibuffer[0] = (byte) b;
+ write(ibuffer, 0, 1);
+ }
+
+ /**
+ * Writes <code>b.length</code> bytes from the specified byte array to this
+ * output stream.
+ * <p>
+ * The <code>write</code> method of <code>SASLOutputStream</code> calls the
+ * <code>write</code> method of three arguments with the three arguments
+ * <code>b</code>, <code>0</code>, and <code>b.length</code>.
+ *
+ * @param b
+ * the data.
+ * @exception NullPointerException
+ * if <code>b</code> is null.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this output stream.
+ *
+ * @param inBuf
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(byte[] inBuf, int off, int len) throws IOException {
+ try {
+ if (saslServer != null) { // using saslServer
+ saslToken = saslServer.wrap(inBuf, off, len);
+ } else { // using saslClient
+ saslToken = saslClient.wrap(inBuf, off, len);
+ }
+ } catch (SaslException se) {
+ try {
+ disposeSasl();
+ } catch (SaslException ignored) {
+ }
+ throw se;
+ }
+ if (saslToken != null) {
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ saslToken = null;
+ }
+ }
+
+ /**
+ * Flushes this output stream
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * Closes this output stream and releases any system resources associated with
+ * this stream.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void close() throws IOException {
+ disposeSasl();
+ outStream.close();
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcClient.java?rev=905860&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcClient.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcClient.java Wed Feb 3 01:30:25 2010
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * A utility class that encapsulates SASL logic for RPC client
+ */
+public class SaslRpcClient {
+ public static final Log LOG = LogFactory.getLog(SaslRpcClient.class);
+
+ private final SaslClient saslClient;
+
+ /**
+ * Create a SaslRpcClient for an authentication method
+ *
+ * @param method
+ * the requested authentication method
+ * @param token
+ * token to use if needed by the authentication method
+ */
+ public SaslRpcClient(AuthMethod method,
+ Token<? extends TokenIdentifier> token, String serverPrincipal)
+ throws IOException {
+ switch (method) {
+ case DIGEST:
+ if (LOG.isDebugEnabled())
+ LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
+ + " client to authenticate to service at " + token.getService());
+ saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+ .getMechanismName() }, null, null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+ break;
+ case KERBEROS:
+ if (LOG.isDebugEnabled()) {
+ LOG
+ .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
+ + " client. Server's Kerberos principal name is "
+ + serverPrincipal);
+ }
+ if (serverPrincipal == null || serverPrincipal.length() == 0) {
+ throw new IOException(
+ "Failed to specify server's Kerberos principal name");
+ }
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
+ .getMechanismName() }, null, names[0], names[1],
+ SaslRpcServer.SASL_PROPS, null);
+ break;
+ default:
+ throw new IOException("Unknown authentication method " + method);
+ }
+ if (saslClient == null)
+ throw new IOException("Unable to find SASL client implementation");
+ }
+
+ /**
+ * Do client side SASL authentication with server via the given InputStream
+ * and OutputStream
+ *
+ * @param inS
+ * InputStream to use
+ * @param outS
+ * OutputStream to use
+ * @throws IOException
+ */
+ public void saslConnect(InputStream inS, OutputStream outS)
+ throws IOException {
+ DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
+ DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
+ outS));
+
+ try {
+ byte[] saslToken = new byte[0];
+ if (saslClient.hasInitialResponse())
+ saslToken = saslClient.evaluateChallenge(saslToken);
+ if (saslToken != null) {
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ outStream.flush();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have sent token of size " + saslToken.length
+ + " from initSASLContext.");
+ }
+ if (!saslClient.isComplete()) {
+ saslToken = new byte[inStream.readInt()];
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will read input token of size " + saslToken.length
+ + " for processing by initSASLContext");
+ inStream.readFully(saslToken);
+ }
+
+ while (!saslClient.isComplete()) {
+ saslToken = saslClient.evaluateChallenge(saslToken);
+ if (saslToken != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will send token of size " + saslToken.length
+ + " from initSASLContext.");
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ outStream.flush();
+ }
+ if (!saslClient.isComplete()) {
+ saslToken = new byte[inStream.readInt()];
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will read input token of size " + saslToken.length
+ + " for processing by initSASLContext");
+ inStream.readFully(saslToken);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client context established. Negotiated QoP: "
+ + saslClient.getNegotiatedProperty(Sasl.QOP));
+ }
+ } catch (IOException e) {
+ saslClient.dispose();
+ throw e;
+ }
+ }
+
+ /**
+ * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
+ * been called.
+ *
+ * @param in
+ * the InputStream to wrap
+ * @return a SASL wrapped InputStream
+ * @throws IOException
+ */
+ public InputStream getInputStream(InputStream in) throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Sasl authentication exchange hasn't completed yet");
+ }
+ return new SaslInputStream(in, saslClient);
+ }
+
+ /**
+ * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
+ * been called.
+ *
+ * @param out
+ * the OutputStream to wrap
+ * @return a SASL wrapped OutputStream
+ * @throws IOException
+ */
+ public OutputStream getOutputStream(OutputStream out) throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Sasl authentication exchange hasn't completed yet");
+ }
+ return new SaslOutputStream(out, saslClient);
+ }
+
+ /** Release resources used by wrapped saslClient */
+ public void dispose() throws SaslException {
+ saslClient.dispose();
+ }
+
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ private final String userName;
+ private final char[] userPassword;
+
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = SaslRpcServer.encodeIdentifier(token.getIdentifier());
+ this.userPassword = SaslRpcServer.encodePassword(token.getPassword());
+ }
+
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting username: " + userName);
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting userPassword");
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcServer.java?rev=905860&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcServer.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/SaslRpcServer.java Wed Feb 3 01:30:25 2010
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * A utility class for dealing with SASL on RPC server
+ */
+public class SaslRpcServer {
+ public static final Log LOG = LogFactory.getLog(SaslRpcServer.class);
+ public static final String SASL_DEFAULT_REALM = "default";
+ public static final Map<String, String> SASL_PROPS =
+ new TreeMap<String, String>();
+ static {
+ // Request authentication plus integrity protection
+ SASL_PROPS.put(Sasl.QOP, "auth-int");
+ // Request mutual authentication
+ SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+ }
+
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ static byte[] decodeIdentifier(String identifier) {
+ return Base64.decodeBase64(identifier.getBytes());
+ }
+
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+
+ /** Splitting fully qualified Kerberos name into parts */
+ public static String[] splitKerberosName(String fullName) {
+ return fullName.split("[/@]");
+ }
+
+ /** Authentication method */
+ public static enum AuthMethod {
+ SIMPLE((byte) 80, ""), // no authentication
+ KERBEROS((byte) 81, "GSSAPI"), // SASL Kerberos authentication
+ DIGEST((byte) 82, "DIGEST-MD5"); // SASL DIGEST-MD5 authentication
+
+ /** The code for this method. */
+ public final byte code;
+ public final String mechanismName;
+
+ private AuthMethod(byte code, String mechanismName) {
+ this.code = code;
+ this.mechanismName = mechanismName;
+ }
+
+ private static final int FIRST_CODE = values()[0].code;
+
+ /** Return the object represented by the code. */
+ private static AuthMethod valueOf(byte code) {
+ final int i = (code & 0xff) - FIRST_CODE;
+ return i < 0 || i >= values().length ? null : values()[i];
+ }
+
+ /** Return the SASL mechanism name */
+ public String getMechanismName() {
+ return mechanismName;
+ }
+
+ /** Read from in */
+ public static AuthMethod read(DataInput in) throws IOException {
+ return valueOf(in.readByte());
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.write(code);
+ }
+ };
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ public static class SaslDigestCallbackHandler implements CallbackHandler {
+ private SecretManager<TokenIdentifier> secretManager;
+
+ public SaslDigestCallbackHandler(
+ SecretManager<TokenIdentifier> secretManager) {
+ this.secretManager = secretManager;
+ }
+
+ private TokenIdentifier getIdentifier(String id) throws IOException {
+ byte[] tokenId = decodeIdentifier(id);
+ TokenIdentifier tokenIdentifier = secretManager.createIdentifier();
+ tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
+ tokenId)));
+ return tokenIdentifier;
+ }
+
+ private char[] getPassword(TokenIdentifier tokenid) throws IOException {
+ return encodePassword(secretManager.retrievePassword(tokenid));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName());
+ char[] password = getPassword(tokenIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+ + "for client: " + tokenIdentifier.getUsername());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ String username = getIdentifier(authzid).getUsername().toString();
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ ac.setAuthorizedID(username);
+ }
+ }
+ }
+ }
+
+ /** CallbackHandler for SASL GSSAPI Kerberos mechanism */
+ public static class SaslGssCallbackHandler implements CallbackHandler {
+
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL GSSAPI Callback");
+ }
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL server GSSAPI callback: setting "
+ + "canonicalized client ID: " + authzid);
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/security/token/SecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/token/SecretManager.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/token/SecretManager.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/token/SecretManager.java Wed Feb 3 01:30:25 2010
@@ -62,6 +62,12 @@
public abstract byte[] retrievePassword(T identifier) throws InvalidToken;
/**
+ * Create an empty token identifier.
+ * @return the newly created empty token identifier
+ */
+ public abstract T createIdentifier();
+
+ /**
* The name of the hashing algorithm.
*/
private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenIdentifier.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenIdentifier.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenIdentifier.java Wed Feb 3 01:30:25 2010
@@ -35,6 +35,12 @@
* @return the kind of the token
*/
public abstract Text getKind();
+
+ /**
+ * Get the username encoded in the token identifier
+ * @return the username
+ */
+ public abstract Text getUsername();
/**
* Get the bytes for the token identifier
Added: hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenInfo.java?rev=905860&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenInfo.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenInfo.java Wed Feb 3 01:30:25 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token;
+
+import java.lang.annotation.*;
+
+/**
+ * Indicates Token related information to be used
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TokenInfo {
+ /** The type of TokenSelector to be used */
+ Class<? extends TokenSelector<? extends TokenIdentifier>> value();
+}
\ No newline at end of file
Added: hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenSelector.java?rev=905860&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenSelector.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/token/TokenSelector.java Wed Feb 3 01:30:25 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Select token of type T from tokens for use with named service
+ *
+ * @param <T>
+ * T extends TokenIdentifier
+ */
+public interface TokenSelector<T extends TokenIdentifier> {
+ Token<T> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens);
+}
Modified: hadoop/common/trunk/src/test/core-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core-site.xml?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core-site.xml (original)
+++ hadoop/common/trunk/src/test/core-site.xml Wed Feb 3 01:30:25 2010
@@ -63,4 +63,10 @@
<description>The name of the s3n file system for testing.</description>
</property>
+<!-- Turn security off for tests by default -->
+<property>
+ <name>hadoop.security.authentication</name>
+ <value>simple</value>
+</property>
+
</configuration>
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=905860&r1=905859&r2=905860&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Wed Feb 3 01:30:25 2010
@@ -68,7 +68,7 @@
int[] exchange(int[] values) throws IOException;
}
- public class TestImpl implements TestProtocol {
+ public static class TestImpl implements TestProtocol {
int fastPingCounter = 0;
public long getProtocolVersion(String protocol, long clientVersion) {
@@ -189,7 +189,7 @@
System.out.println("Testing Slow RPC");
// create a server with two handlers
Server server = RPC.getServer(TestProtocol.class,
- new TestImpl(), ADDRESS, 0, 2, false, conf);
+ new TestImpl(), ADDRESS, 0, 2, false, conf, null);
TestProtocol proxy = null;
try {
@@ -339,7 +339,7 @@
ServiceAuthorizationManager.refresh(conf, new TestPolicyProvider());
Server server = RPC.getServer(TestProtocol.class,
- new TestImpl(), ADDRESS, 0, 5, true, conf);
+ new TestImpl(), ADDRESS, 0, 5, true, conf, null);
TestProtocol proxy = null;