You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:46:20 UTC
svn commit: r1077150 [1/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/fs/ core/org/apache/hadoop/ipc/
core/org/apache/hadoop/security/ core/org/apache/hadoop/security/token/
hdfs/org/apache/hadoop/hdfs/ hdfs/o...
Author: omalley
Date: Fri Mar 4 03:46:18 2011
New Revision: 1077150
URL: http://svn.apache.org/viewvc?rev=1077150&view=rev
Log:
commit 74c091945aaea041bf4862d0bff1fd68b5d45625
Author: Devaraj Das <dd...@yahoo-inc.com>
Date: Fri Feb 5 15:35:16 2010 -0800
HADOOP:6419 from https://issues.apache.org/jira/secure/attachment/12434998/HADOOP-6419-0.20-15.patch
+++ b/YAHOO-CHANGES.txt
+ HADOOP-6419. Adds SASL based authentication to RPC. Also includes the
+ MAPREDUCE-1335 and HDFS-933 patches. Contributed by Kan Zhang.
+ (ddas)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/KerberosInfo.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslInputStream.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslOutputStream.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcClient.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/TokenInfo.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/TokenSelector.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/DelegationTokenSelector.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestSaslRPC.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/SecretManager.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/TokenIdentifier.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/DelegationTokenSecretManager.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmTask.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/core-site.xml
hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestUserGroupInformation.java
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1077150&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java Fri Mar 4 03:46:18 2011
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+/**
+ * This class contains constants for configuration keys used
+ * in the common code.
+ *
+ */
+
+public class CommonConfigurationKeys {
+
+ // The Keys
+ public static final String IO_NATIVE_LIB_AVAILABLE_KEY =
+ "io.native.lib.available";
+ public static final boolean IO_NATIVE_LIB_AVAILABLE_DEFAULT = true;
+ public static final String NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY =
+ "net.topology.script.number.args";
+ public static final int NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT = 100;
+
+ //FS keys
+ public static final String FS_HOME_DIR_KEY = "fs.homeDir";
+ public static final String FS_HOME_DIR_DEFAULT = "/user";
+ public static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS";
+ public static final String FS_DEFAULT_NAME_DEFAULT = "file:///";
+ public static final String FS_PERMISSIONS_UMASK_KEY = "fs.permissions.umask-mode";
+ public static final int FS_PERMISSIONS_UMASK_DEFAULT = 0022;
+ public static final String FS_DF_INTERVAL_KEY = "fs.df.interval";
+ public static final long FS_DF_INTERVAL_DEFAULT = 60000;
+
+
+ //Defaults are not specified for following keys
+ public static final String NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY =
+ "net.topology.script.file.name";
+ public static final String NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY =
+ "net.topology.configured.node.mapping";
+ public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY =
+ "net.topology.node.switch.mapping.impl";
+
+ public static final String FS_CLIENT_BUFFER_DIR_KEY =
+ "fs.client.buffer.dir";
+
+ //TBD: Code is not updated to use following keys.
+ //These keys will be used in later versions
+ //
+ public static final long FS_LOCAL_BLOCK_SIZE_DEFAULT = 32*1024*1024;
+ public static final String FS_AUTOMATIC_CLOSE_KEY = "fs.automatic.close";
+ public static final boolean FS_AUTOMATIC_CLOSE_DEFAULT = true;
+ public static final String FS_FILE_IMPL_KEY = "fs.file.impl";
+ public static final String FS_FTP_HOST_KEY = "fs.ftp.host";
+ public static final String FS_FTP_HOST_PORT_KEY = "fs.ftp.host.port";
+ public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval";
+ public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
+
+ public static final String IO_MAPFILE_BLOOM_SIZE_KEY = "io.mapfile.bloom.size";
+ public static final int IO_MAPFILE_BLOOM_SIZE_DEFAULT = 1024*1024;
+ public static final String IO_MAPFILE_BLOOM_ERROR_RATE_KEY =
+ "io.mapfile.bloom.error.rate" ;
+ public static final float IO_MAPFILE_BLOOM_ERROR_RATE_DEFAULT = 0.005f;
+ public static final String IO_COMPRESSION_CODEC_LZO_CLASS_KEY = "io.compression.codec.lzo.class";
+ public static final String IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY =
+ "io.compression.codec.lzo.buffersize";
+ public static final int IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT = 64*1024;
+ public static final String IO_MAP_INDEX_INTERVAL_KEY = "io.map.index.interval";
+ public static final int IO_MAP_INDEX_INTERVAL_DEFAULT = 128;
+ public static final String IO_MAP_INDEX_SKIP_KEY = "io.map.index.skip";
+ public static final int IO_MAP_INDEX_SKIP_DEFAULT = 0;
+ public static final String IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY = "io.seqfile.compress.blocksize";
+ public static final int IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT = 1000000;
+ public static final String IO_SKIP_CHECKSUM_ERRORS_KEY = "io.skip.checksum.errors";
+ public static final boolean IO_SKIP_CHECKSUM_ERRORS_DEFAULT = false;
+ public static final String IO_SORT_MB_KEY = "io.sort.mb";
+ public static final int IO_SORT_MB_DEFAULT = 100;
+ public static final String IO_SORT_FACTOR_KEY = "io.sort.factor";
+ public static final int IO_SORT_FACTOR_DEFAULT = 100;
+ public static final String IO_SERIALIZATIONS_KEY = "io.serializations";
+
+ public static final String TFILE_IO_CHUNK_SIZE_KEY = "tfile.io.chunk.size";
+ public static final int TFILE_IO_CHUNK_SIZE_DEFAULT = 1024*1024;
+ public static final String TFILE_FS_INPUT_BUFFER_SIZE_KEY = "tfile.fs.input.buffer.size";
+ public static final int TFILE_FS_INPUT_BUFFER_SIZE_DEFAULT = 256*1024;
+ public static final String TFILE_FS_OUTPUT_BUFFER_SIZE_KEY = "tfile.fs.output.buffer.size";
+ public static final int TFILE_FS_OUTPUT_BUFFER_SIZE_DEFAULT = 256*1024;
+
+ public static final String IPC_PING_INTERVAL_KEY = "ipc.ping.interval";
+ public static final int IPC_PING_INTERVAL_DEFAULT = 60000;
+ public static final String IPC_CLIENT_PING_KEY = "ipc.client.ping";
+ public static final boolean IPC_CLIENT_PING_DEFAULT = true;
+ public static final String IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY =
+ "ipc.client.connection.maxidletime";
+ public static final int IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT = 10000;
+ public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_KEY =
+ "ipc.client.connect.max.retries";
+ public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
+ public static final String IPC_CLIENT_TCPNODELAY_KEY = "ipc.client.tcpnodelay";
+ public static final boolean IPC_CLIENT_TCPNODELAY_DEFAULT = false;
+ public static final String IPC_SERVER_LISTEN_QUEUE_SIZE_KEY =
+ "ipc.server.listen.queue.size";
+ public static final int IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT = 128;
+ public static final String IPC_CLIENT_KILL_MAX_KEY = "ipc.client.kill.max";
+ public static final int IPC_CLIENT_KILL_MAX_DEFAULT = 10;
+ public static final String IPC_CLIENT_IDLETHRESHOLD_KEY = "ipc.client.idlethreshold";
+ public static final int IPC_CLIENT_IDLETHRESHOLD_DEFAULT = 4000;
+ public static final String IPC_SERVER_TCPNODELAY_KEY = "ipc.server.tcpnodelay";
+ public static final boolean IPC_SERVER_TCPNODELAY_DEFAULT = false;
+
+ public static final String HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY =
+ "hadoop.rpc.socket.factory.class.default";
+ public static final String HADOOP_SOCKS_SERVER_KEY = "hadoop.socks.server";
+ public static final String HADOOP_JOB_UGI_KEY = "hadoop.job.ugi";
+ public static final String HADOOP_UTIL_HASH_TYPE_KEY = "hadoop.util.hash.type";
+ public static final String HADOOP_UTIL_HASH_TYPE_DEFAULT = "murmur";
+ public static final String HADOOP_SECURITY_GROUP_MAPPING = "hadoop.security.group.mapping";
+ public static final String HADOOP_SECURITY_GROUPS_CACHE_SECS = "hadoop.security.groups.cache.secs";
+ public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
+}
+
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java?rev=1077150&r1=1077149&r2=1077150&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java Fri Mar 4 03:46:18 2011
@@ -31,7 +31,9 @@ import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FilterInputStream;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.PrivilegedExceptionAction;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -44,11 +46,19 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.util.ReflectionUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
@@ -175,8 +185,13 @@ public class Client {
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private InetSocketAddress server; // server ip:port
+ private String serverPrincipal; // server's krb5 principal name
private ConnectionHeader header; // connection header
- private ConnectionId remoteId; // connection id
+ private final ConnectionId remoteId; // connection id
+ private final AuthMethod authMethod; // authentication method
+ private final boolean useSasl;
+ private Token<? extends TokenIdentifier> token;
+ private SaslRpcClient saslRpcClient;
private Socket socket = null; // connected socket
private DataInputStream in;
@@ -200,6 +215,42 @@ public class Client {
Class<?> protocol = remoteId.getProtocol();
header =
new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket);
+ this.useSasl = UserGroupInformation.isSecurityEnabled();
+ if (useSasl && protocol != null) {
+ TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class);
+ if (tokenInfo != null) {
+ TokenSelector<? extends TokenIdentifier> tokenSelector = null;
+ try {
+ tokenSelector = tokenInfo.value().newInstance();
+ } catch (InstantiationException e) {
+ throw new IOException(e.toString());
+ } catch (IllegalAccessException e) {
+ throw new IOException(e.toString());
+ }
+ InetSocketAddress addr = remoteId.getAddress();
+ token = tokenSelector.selectToken(new Text(addr.getAddress()
+ .getHostAddress() + ":" + addr.getPort()),
+ ticket.getTokens());
+ }
+ KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
+ if (krbInfo != null) {
+ String serverKey = krbInfo.value();
+ if (serverKey != null) {
+ serverPrincipal = conf.get(serverKey);
+ }
+ }
+ }
+
+ if (!useSasl) {
+ authMethod = AuthMethod.SIMPLE;
+ } else if (token != null) {
+ authMethod = AuthMethod.DIGEST;
+ } else {
+ authMethod = AuthMethod.KERBEROS;
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Use " + authMethod + " authentication for protocol "
+ + protocol.getSimpleName());
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
@@ -281,11 +332,20 @@ public class Client {
}
}
+ private synchronized void disposeSasl() {
+ if (saslRpcClient != null) {
+ try {
+ saslRpcClient.dispose();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
- private synchronized void setupIOstreams() {
+ private synchronized void setupIOstreams() throws InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}
@@ -313,10 +373,28 @@ public class Client {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
+ InputStream inStream = NetUtils.getInputStream(socket);
+ OutputStream outStream = NetUtils.getOutputStream(socket);
+ writeRpcHeader(outStream);
+ if (useSasl) {
+ final InputStream in2 = inStream;
+ final OutputStream out2 = outStream;
+ remoteId.getTicket().doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws IOException {
+ saslRpcClient = new SaslRpcClient(authMethod, token,
+ serverPrincipal);
+ saslRpcClient.saslConnect(in2, out2);
+ return null;
+ }
+ });
+ inStream = saslRpcClient.getInputStream(inStream);
+ outStream = saslRpcClient.getOutputStream(outStream);
+ }
this.in = new DataInputStream(new BufferedInputStream
- (new PingInputStream(NetUtils.getInputStream(socket))));
+ (new PingInputStream(inStream)));
this.out = new DataOutputStream
- (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+ (new BufferedOutputStream(outStream));
writeHeader();
// update last activity time
@@ -370,14 +448,20 @@ public class Client {
". Already tried " + curRetries + " time(s).");
}
- /* Write the header for each connection
+ /* Write the RPC header */
+ private void writeRpcHeader(OutputStream outStream) throws IOException {
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
+ // Write out the header, version and authentication method
+ out.write(Server.HEADER.array());
+ out.write(Server.CURRENT_VERSION);
+ authMethod.write(out);
+ out.flush();
+ }
+
+ /* Write the protocol header for each connection
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
- // Write out the header and version
- out.write(Server.HEADER.array());
- out.write(Server.CURRENT_VERSION);
-
// Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer();
header.write(buf);
@@ -548,6 +632,7 @@ public class Client {
// close the streams and therefore the socket
IOUtils.closeStream(out);
IOUtils.closeStream(in);
+ disposeSasl();
// clean up all calls
if (closeException == null) {
@@ -787,7 +872,7 @@ public class Client {
*/
@Deprecated
public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
- throws IOException {
+ throws IOException, InterruptedException {
return call(params, addresses, null, null);
}
@@ -797,7 +882,7 @@ public class Client {
* contains nulls for calls that timed out or errored. */
public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
Class<?> protocol, UserGroupInformation ticket)
- throws IOException {
+ throws IOException, InterruptedException {
if (addresses.length == 0) return new Writable[0];
ParallelResults results = new ParallelResults(params.length);
@@ -831,7 +916,7 @@ public class Client {
Class<?> protocol,
UserGroupInformation ticket,
Call call)
- throws IOException {
+ throws IOException, InterruptedException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java?rev=1077150&r1=1077149&r2=1077150&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java Fri Mar 4 03:46:18 2011
@@ -37,6 +37,8 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
@@ -400,7 +402,7 @@ public class RPC {
*/
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, Configuration conf)
- throws IOException {
+ throws IOException, InterruptedException {
return call(method, params, addrs, null, conf);
}
@@ -408,7 +410,7 @@ public class RPC {
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
- throws IOException {
+ throws IOException, InterruptedException {
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
@@ -447,9 +449,19 @@ public class RPC {
final int numHandlers,
final boolean verbose, Configuration conf)
throws IOException {
- return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+ return getServer(instance, bindAddress, port, numHandlers, verbose, conf, null);
}
+ /** Construct a server for a protocol implementation instance listening on a
+ * port and address, with a secret manager. */
+ public static Server getServer(final Object instance, final String bindAddress, final int port,
+ final int numHandlers,
+ final boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
+ }
+
/** An RPC Server. */
public static class Server extends org.apache.hadoop.ipc.Server {
private Object instance;
@@ -463,7 +475,7 @@ public class RPC {
*/
public Server(Object instance, Configuration conf, String bindAddress, int port)
throws IOException {
- this(instance, conf, bindAddress, port, 1, false);
+ this(instance, conf, bindAddress, port, 1, false, null);
}
private static String classNameBase(String className) {
@@ -483,8 +495,11 @@ public class RPC {
* @param verbose whether each call should be logged
*/
public Server(Object instance, Configuration conf, String bindAddress, int port,
- int numHandlers, boolean verbose) throws IOException {
- super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
+ int numHandlers, boolean verbose,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ super(bindAddress, port, Invocation.class, numHandlers, conf,
+ classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java?rev=1077150&r1=1077149&r2=1077150&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java Fri Mar 4 03:46:18 2011
@@ -32,6 +32,7 @@ import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
@@ -39,7 +40,6 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
-import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
@@ -52,15 +52,26 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
+import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -80,7 +91,8 @@ public abstract class Server {
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
- public static final byte CURRENT_VERSION = 3;
+ // 4 : Introduced SASL security layer
+ public static final byte CURRENT_VERSION = 4;
/**
* How many calls/handler are allowed in the queue.
@@ -152,6 +164,7 @@ public abstract class Server {
protected RpcMetrics rpcMetrics;
private Configuration conf;
+ private SecretManager<TokenIdentifier> secretManager;
private int maxQueueSize;
private int socketSendBufferSize;
@@ -424,7 +437,7 @@ public abstract class Server {
if (count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " +
- c.getHostAddress() + ". Number of active connections: "+
+ c + ". Number of active connections: "+
numConnections);
closeConnection(c);
c = null;
@@ -694,8 +707,7 @@ public abstract class Server {
/** Reads calls from a connection and queues them for handling. */
private class Connection {
- private boolean versionRead = false; //if initial signature and
- //version are read
+ private boolean rpcHeaderRead = false; // if initial rpc header is read
private boolean headerRead = false; //if the connection header that
//follows version is read.
@@ -714,6 +726,13 @@ public abstract class Server {
ConnectionHeader header = new ConnectionHeader();
Class<?> protocol;
+ boolean useSasl;
+ SaslServer saslServer;
+ private AuthMethod authMethod;
+ private boolean saslContextEstablished;
+ private ByteBuffer rpcHeaderBuffer;
+ private ByteBuffer unwrappedData;
+ private ByteBuffer unwrappedDataLengthBuffer;
UserGroupInformation user = null;
@@ -722,6 +741,10 @@ public abstract class Server {
private final Call authFailedCall =
new Call(AUTHROIZATION_FAILED_CALLID, null, null);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
+ // Fake 'call' for SASL context setup
+ private static final int SASL_CALLID = -33;
+ private final Call saslCall = new Call(SASL_CALLID, null, null);
+ private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
public Connection(SelectionKey key, SocketChannel channel,
long lastContact) {
@@ -729,6 +752,8 @@ public abstract class Server {
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
+ this.unwrappedData = null;
+ this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
InetAddress addr = socket.getInetAddress();
if (addr == null) {
@@ -786,6 +811,92 @@ public abstract class Server {
return false;
}
+ private void saslReadAndProcess(byte[] saslToken) throws IOException,
+ InterruptedException {
+ if (!saslContextEstablished) {
+ if (saslServer == null) {
+ switch (authMethod) {
+ case DIGEST:
+ saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
+ .getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
+ secretManager));
+ break;
+ default:
+ UserGroupInformation current = UserGroupInformation
+ .getCurrentUser();
+ String fullName = current.getUserName();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Kerberos principal name is " + fullName);
+ final String names[] = SaslRpcServer.splitKerberosName(fullName);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected "
+ + "hostname part: " + fullName);
+ }
+ current.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws IOException {
+ saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
+ .getMechanismName(), names[0], names[1],
+ SaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
+ return null;
+ }
+ });
+ }
+ if (saslServer == null)
+ throw new IOException(
+ "Unable to find SASL server implementation for "
+ + authMethod.getMechanismName());
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created SASL server with mechanism = "
+ + authMethod.getMechanismName());
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have read input token of size " + saslToken.length
+ + " for processing by saslServer.evaluateResponse()");
+ byte[] replyToken = saslServer.evaluateResponse(saslToken);
+ if (replyToken != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will send token of size " + replyToken.length
+ + " from saslServer.");
+ saslCall.connection = this;
+ saslResponse.reset();
+ DataOutputStream out = new DataOutputStream(saslResponse);
+ out.writeInt(replyToken.length);
+ out.write(replyToken, 0, replyToken.length);
+ saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
+ responder.doRespond(saslCall);
+ }
+ if (saslServer.isComplete()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server context established. Negotiated QoP is "
+ + saslServer.getNegotiatedProperty(Sasl.QOP));
+ }
+ user = UserGroupInformation.createRemoteUser(saslServer
+ .getAuthorizationID());
+ LOG.info("SASL server successfully authenticated client: " + user);
+ saslContextEstablished = true;
+ }
+ } else {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have read input token of size " + saslToken.length
+ + " for processing by saslServer.unwrap()");
+ byte[] plaintextData = saslServer
+ .unwrap(saslToken, 0, saslToken.length);
+ processUnwrappedData(plaintextData);
+ }
+ }
+
+ private void disposeSasl() {
+ if (saslServer != null) {
+ try {
+ saslServer.dispose();
+ } catch (SaslException ignored) {
+ }
+ }
+ }
+
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
@@ -798,14 +909,33 @@ public abstract class Server {
return count;
}
- if (!versionRead) {
+ if (!rpcHeaderRead) {
//Every connection is expected to send the header.
- ByteBuffer versionBuffer = ByteBuffer.allocate(1);
- count = channelRead(channel, versionBuffer);
- if (count <= 0) {
+ if (rpcHeaderBuffer == null) {
+ rpcHeaderBuffer = ByteBuffer.allocate(2);
+ }
+ count = channelRead(channel, rpcHeaderBuffer);
+ if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
- int version = versionBuffer.get(0);
+ int version = rpcHeaderBuffer.get(0);
+ byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
+ authMethod = AuthMethod.read(new DataInputStream(
+ new ByteArrayInputStream(method)));
+ if (authMethod == null) {
+ throw new IOException("Unable to read authentication method");
+ }
+ if (UserGroupInformation.isSecurityEnabled()
+ && authMethod == AuthMethod.SIMPLE) {
+ throw new IOException("Authentication is required");
+ }
+ if (!UserGroupInformation.isSecurityEnabled()
+ && authMethod != AuthMethod.SIMPLE) {
+ throw new IOException("Authentication is not supported");
+ }
+ if (authMethod != AuthMethod.SIMPLE) {
+ useSasl = true;
+ }
dataLengthBuffer.flip();
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
@@ -817,7 +947,8 @@ public abstract class Server {
return -1;
}
dataLengthBuffer.clear();
- versionRead = true;
+ rpcHeaderBuffer = null;
+ rpcHeaderRead = true;
continue;
}
@@ -825,12 +956,11 @@ public abstract class Server {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
- if (dataLength == Client.PING_CALL_ID) {
+ if (!useSasl && dataLength == Client.PING_CALL_ID) {
dataLengthBuffer.clear();
return 0; //ping message
}
data = ByteBuffer.allocate(dataLength);
- incRpcCount(); // Increment the rpc count
}
count = channelRead(channel, data);
@@ -838,33 +968,14 @@ public abstract class Server {
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
- if (headerRead) {
- processData();
- data = null;
- return count;
+ boolean isHeaderRead = headerRead;
+ if (useSasl) {
+ saslReadAndProcess(data.array());
} else {
- processHeader();
- headerRead = true;
- data = null;
-
- // Authorize the connection
- try {
- authorize(user, header);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully authorized " + header);
- }
- } catch (AuthorizationException ae) {
- authFailedCall.connection = this;
- setupResponse(authFailedResponse, authFailedCall,
- Status.FATAL, null,
- ae.getClass().getName(), ae.getMessage());
- responder.doRespond(authFailedCall);
-
- // Close this connection
- return -1;
- }
-
+ processOneRpc(data.array());
+ }
+ data = null;
+ if (!isHeaderRead) {
continue;
}
}
@@ -873,9 +984,9 @@ public abstract class Server {
}
/// Reads the connection header following version
- private void processHeader() throws IOException {
+ private void processHeader(byte[] buf) throws IOException {
DataInputStream in =
- new DataInputStream(new ByteArrayInputStream(data.array()));
+ new DataInputStream(new ByteArrayInputStream(buf));
header.readFields(in);
try {
String protocolClassName = header.getProtocol();
@@ -886,12 +997,73 @@ public abstract class Server {
throw new IOException("Unknown protocol: " + header.getProtocol());
}
- user = header.getUgi();
+ UserGroupInformation protocolUser = header.getUgi();
+ if (!useSasl) {
+ user = protocolUser;
+ } else if (protocolUser != null && !protocolUser.equals(user)) {
+ throw new AccessControlException("Authenticated user (" + user
+ + ") doesn't match what the client claims to be (" + protocolUser
+ + ")");
+ }
}
- private void processData() throws IOException, InterruptedException {
+ private void processUnwrappedData(byte[] inBuf) throws IOException,
+ InterruptedException {
+ ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
+ inBuf));
+ // Read all RPCs contained in the inBuf, even partial ones
+ while (true) {
+ int count = -1;
+ if (unwrappedDataLengthBuffer.remaining() > 0) {
+ count = channelRead(ch, unwrappedDataLengthBuffer);
+ if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
+ return;
+ }
+
+ if (unwrappedData == null) {
+ unwrappedDataLengthBuffer.flip();
+ int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+ if (unwrappedDataLength == Client.PING_CALL_ID) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received ping message");
+ unwrappedDataLengthBuffer.clear();
+ continue; // ping message
+ }
+ unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+ }
+
+ count = channelRead(ch, unwrappedData);
+ if (count <= 0 || unwrappedData.remaining() > 0)
+ return;
+
+ if (unwrappedData.remaining() == 0) {
+ unwrappedDataLengthBuffer.clear();
+ unwrappedData.flip();
+ processOneRpc(unwrappedData.array());
+ unwrappedData = null;
+ }
+ }
+ }
+
+ private void processOneRpc(byte[] buf) throws IOException,
+ InterruptedException {
+ if (headerRead) {
+ processData(buf);
+ } else {
+ processHeader(buf);
+ headerRead = true;
+ if (!authorizeConnection()) {
+ throw new AccessControlException("Connection from " + this
+ + " for protocol " + header.getProtocol()
+ + " is unauthorized for user " + user);
+ }
+ }
+ }
+
+ private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(data.array()));
+ new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
if (LOG.isDebugEnabled())
@@ -902,9 +1074,27 @@ public abstract class Server {
Call call = new Call(id, param, this);
callQueue.put(call); // queue the call; maybe blocked here
+ incRpcCount(); // Increment the rpc count
}
+ private boolean authorizeConnection() throws IOException {
+ try {
+ authorize(user, header);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully authorized " + header);
+ }
+ } catch (AuthorizationException ae) {
+ authFailedCall.connection = this;
+ setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+ ae.getClass().getName(), ae.getMessage());
+ responder.doRespond(authFailedCall);
+ return false;
+ }
+ return true;
+ }
+
private synchronized void close() throws IOException {
+ disposeSasl();
data = null;
dataLengthBuffer = null;
if (!channel.isOpen())
@@ -993,16 +1183,17 @@ public abstract class Server {
Configuration conf)
throws IOException
{
- this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port));
+ this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port), null);
}
/** Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
*
*/
+ @SuppressWarnings("unchecked")
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
- Configuration conf, String serverName)
+ Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
@@ -1015,6 +1206,7 @@ public abstract class Server {
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+ this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
false);
@@ -1068,9 +1260,29 @@ public abstract class Server {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
+ wrapWithSasl(response, call);
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
+ private void wrapWithSasl(ByteArrayOutputStream response, Call call)
+ throws IOException {
+ if (call.connection.useSasl) {
+ byte[] token = response.toByteArray();
+ // synchronization may be needed since there can be multiple Handler
+ // threads using saslServer to wrap responses.
+ synchronized (call.connection.saslServer) {
+ token = call.connection.saslServer.wrap(token, 0, token.length);
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Adding saslServer wrapped token of size " + token.length
+ + " as call response.");
+ response.reset();
+ DataOutputStream saslOut = new DataOutputStream(response);
+ saslOut.writeInt(token.length);
+ saslOut.write(token, 0, token.length);
+ }
+ }
+
Configuration getConf() {
return conf;
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/KerberosInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/KerberosInfo.java?rev=1077150&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/KerberosInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/KerberosInfo.java Fri Mar 4 03:46:18 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.lang.annotation.*;
+
+/**
+ * Indicates Kerberos related information to be used
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface KerberosInfo {
+ /** Key for getting server's Kerberos principal name from Configuration */
+ String value();
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslInputStream.java?rev=1077150&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslInputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslInputStream.java Fri Mar 4 03:46:18 2011
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.InputStream;
+import java.io.IOException;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A SaslInputStream is composed of an InputStream and a SaslServer (or
+ * SaslClient) so that read() methods return data that are read in from the
+ * underlying InputStream but have been additionally processed by the SaslServer
+ * (or SaslClient) object. The SaslServer (or SaslClient) object must be fully
+ * initialized before being used by a SaslInputStream.
+ */
+public class SaslInputStream extends InputStream {
+ public static final Log LOG = LogFactory.getLog(SaslInputStream.class);
+
+ private final DataInputStream inStream;
+ /*
+ * data read from the underlying input stream before being processed by SASL
+ */
+ private byte[] saslToken;
+ private final SaslClient saslClient;
+ private final SaslServer saslServer;
+ private byte[] lengthBuf = new byte[4];
+ /*
+ * buffer holding data that have been processed by SASL, but have not been
+ * read out
+ */
+ private byte[] obuffer;
+ // position of the next "new" byte
+ private int ostart = 0;
+ // position of the last "new" byte
+ private int ofinish = 0;
+
+ private static int unsignedBytesToInt(byte[] buf) {
+ if (buf.length != 4) {
+ throw new IllegalArgumentException(
+ "Cannot handle byte array other than 4 bytes");
+ }
+ int result = 0;
+ for (int i = 0; i < 4; i++) {
+ result <<= 8;
+ result |= ((int) buf[i] & 0xff);
+ }
+ return result;
+ }
+
+ /**
+ * Read more data and get them processed <br>
+ * Entry condition: ostart = ofinish <br>
+ * Exit condition: ostart <= ofinish <br>
+ *
+ * return (ofinish-ostart) (we have this many bytes for you), 0 (no data now,
+ * but could have more later), or -1 (absolutely no more data)
+ */
+ private int readMoreData() throws IOException {
+ try {
+ inStream.readFully(lengthBuf);
+ int length = unsignedBytesToInt(lengthBuf);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Actual length is " + length);
+ saslToken = new byte[length];
+ inStream.readFully(saslToken);
+ } catch (EOFException e) {
+ return -1;
+ }
+ try {
+ if (saslServer != null) { // using saslServer
+ obuffer = saslServer.unwrap(saslToken, 0, saslToken.length);
+ } else { // using saslClient
+ obuffer = saslClient.unwrap(saslToken, 0, saslToken.length);
+ }
+ } catch (SaslException se) {
+ try {
+ disposeSasl();
+ } catch (SaslException ignored) {
+ }
+ throw se;
+ }
+ ostart = 0;
+ if (obuffer == null)
+ ofinish = 0;
+ else
+ ofinish = obuffer.length;
+ return ofinish;
+ }
+
+ /**
+ * Disposes of any system resources or security-sensitive information Sasl
+ * might be using.
+ *
+ * @exception SaslException
+ * if a SASL error occurs.
+ */
+ private void disposeSasl() throws SaslException {
+ if (saslClient != null) {
+ saslClient.dispose();
+ }
+ if (saslServer != null) {
+ saslServer.dispose();
+ }
+ }
+
+ /**
+ * Constructs a SASLInputStream from an InputStream and a SaslServer <br>
+ * Note: if the specified InputStream or SaslServer is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param inStream
+ * the InputStream to be processed
+ * @param saslServer
+ * an initialized SaslServer object
+ */
+ public SaslInputStream(InputStream inStream, SaslServer saslServer) {
+ this.inStream = new DataInputStream(inStream);
+ this.saslServer = saslServer;
+ this.saslClient = null;
+ }
+
+ /**
+ * Constructs a SASLInputStream from an InputStream and a SaslClient <br>
+ * Note: if the specified InputStream or SaslClient is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param inStream
+ * the InputStream to be processed
+ * @param saslClient
+ * an initialized SaslClient object
+ */
+ public SaslInputStream(InputStream inStream, SaslClient saslClient) {
+ this.inStream = new DataInputStream(inStream);
+ this.saslServer = null;
+ this.saslClient = saslClient;
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is
+ * returned as an <code>int</code> in the range <code>0</code> to
+ * <code>255</code>. If no byte is available because the end of the stream has
+ * been reached, the value <code>-1</code> is returned. This method blocks
+ * until input data is available, the end of the stream is detected, or an
+ * exception is thrown.
+ * <p>
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the stream
+ * is reached.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int read() throws IOException {
+ if (ostart >= ofinish) {
+ // we loop for new data as we are blocking
+ int i = 0;
+ while (i == 0)
+ i = readMoreData();
+ if (i == -1)
+ return -1;
+ }
+ return ((int) obuffer[ostart++] & 0xff);
+ }
+
+ /**
+ * Reads up to <code>b.length</code> bytes of data from this input stream into
+ * an array of bytes.
+ * <p>
+ * The <code>read</code> method of <code>InputStream</code> calls the
+ * <code>read</code> method of three arguments with the arguments
+ * <code>b</code>, <code>0</code>, and <code>b.length</code>.
+ *
+ * @param b
+ * the buffer into which the data is read.
+ * @return the total number of bytes read into the buffer, or <code>-1</code>
+ * is there is no more data because the end of the stream has been
+ * reached.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /**
+ * Reads up to <code>len</code> bytes of data from this input stream into an
+ * array of bytes. This method blocks until some input is available. If the
+ * first argument is <code>null,</code> up to <code>len</code> bytes are read
+ * and discarded.
+ *
+ * @param b
+ * the buffer into which the data is read.
+ * @param off
+ * the start offset of the data.
+ * @param len
+ * the maximum number of bytes read.
+ * @return the total number of bytes read into the buffer, or <code>-1</code>
+ * if there is no more data because the end of the stream has been
+ * reached.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (ostart >= ofinish) {
+ // we loop for new data as we are blocking
+ int i = 0;
+ while (i == 0)
+ i = readMoreData();
+ if (i == -1)
+ return -1;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ int available = ofinish - ostart;
+ if (len < available)
+ available = len;
+ if (b != null) {
+ System.arraycopy(obuffer, ostart, b, off, available);
+ }
+ ostart = ostart + available;
+ return available;
+ }
+
+ /**
+ * Skips <code>n</code> bytes of input from the bytes that can be read from
+ * this input stream without blocking.
+ *
+ * <p>
+ * Fewer bytes than requested might be skipped. The actual number of bytes
+ * skipped is equal to <code>n</code> or the result of a call to
+ * {@link #available() <code>available</code>}, whichever is smaller. If
+ * <code>n</code> is less than zero, no bytes are skipped.
+ *
+ * <p>
+ * The actual number of bytes skipped is returned.
+ *
+ * @param n
+ * the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public long skip(long n) throws IOException {
+ int available = ofinish - ostart;
+ if (n > available) {
+ n = available;
+ }
+ if (n < 0) {
+ return 0;
+ }
+ ostart += n;
+ return n;
+ }
+
+ /**
+ * Returns the number of bytes that can be read from this input stream without
+ * blocking. The <code>available</code> method of <code>InputStream</code>
+ * returns <code>0</code>. This method <B>should</B> be overridden by
+ * subclasses.
+ *
+ * @return the number of bytes that can be read from this input stream without
+ * blocking.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public int available() throws IOException {
+ return (ofinish - ostart);
+ }
+
+ /**
+ * Closes this input stream and releases any system resources associated with
+ * the stream.
+ * <p>
+ * The <code>close</code> method of <code>SASLInputStream</code> calls the
+ * <code>close</code> method of its underlying input stream.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void close() throws IOException {
+ disposeSasl();
+ ostart = 0;
+ ofinish = 0;
+ inStream.close();
+ }
+
+ /**
+ * Tests if this input stream supports the <code>mark</code> and
+ * <code>reset</code> methods, which it does not.
+ *
+ * @return <code>false</code>, since this class does not support the
+ * <code>mark</code> and <code>reset</code> methods.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslOutputStream.java?rev=1077150&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslOutputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslOutputStream.java Fri Mar 4 03:46:18 2011
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+/**
+ * A SaslOutputStream is composed of an OutputStream and a SaslServer (or
+ * SaslClient) so that write() methods first process the data before writing
+ * them out to the underlying OutputStream. The SaslServer (or SaslClient)
+ * object must be fully initialized before being used by a SaslOutputStream.
+ */
+public class SaslOutputStream extends OutputStream {
+
+ private final DataOutputStream outStream;
+ // processed data ready to be written out
+ private byte[] saslToken;
+
+ private final SaslClient saslClient;
+ private final SaslServer saslServer;
+ // buffer holding one byte of incoming data
+ private final byte[] ibuffer = new byte[1];
+
+ /**
+ * Constructs a SASLOutputStream from an OutputStream and a SaslServer <br>
+ * Note: if the specified OutputStream or SaslServer is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param outStream
+ * the OutputStream to be processed
+ * @param saslServer
+ * an initialized SaslServer object
+ */
+ public SaslOutputStream(OutputStream outStream, SaslServer saslServer) {
+ this.outStream = new DataOutputStream(outStream);
+ this.saslServer = saslServer;
+ this.saslClient = null;
+ }
+
+ /**
+ * Constructs a SASLOutputStream from an OutputStream and a SaslClient <br>
+ * Note: if the specified OutputStream or SaslClient is null, a
+ * NullPointerException may be thrown later when they are used.
+ *
+ * @param outStream
+ * the OutputStream to be processed
+ * @param saslClient
+ * an initialized SaslClient object
+ */
+ public SaslOutputStream(OutputStream outStream, SaslClient saslClient) {
+ this.outStream = new DataOutputStream(outStream);
+ this.saslServer = null;
+ this.saslClient = saslClient;
+ }
+
+ /**
+ * Disposes of any system resources or security-sensitive information Sasl
+ * might be using.
+ *
+ * @exception SaslException
+ * if a SASL error occurs.
+ */
+ private void disposeSasl() throws SaslException {
+ if (saslClient != null) {
+ saslClient.dispose();
+ }
+ if (saslServer != null) {
+ saslServer.dispose();
+ }
+ }
+
+ /**
+ * Writes the specified byte to this output stream.
+ *
+ * @param b
+ * the <code>byte</code>.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(int b) throws IOException {
+ ibuffer[0] = (byte) b;
+ write(ibuffer, 0, 1);
+ }
+
+ /**
+ * Writes <code>b.length</code> bytes from the specified byte array to this
+ * output stream.
+ * <p>
+ * The <code>write</code> method of <code>SASLOutputStream</code> calls the
+ * <code>write</code> method of three arguments with the three arguments
+ * <code>b</code>, <code>0</code>, and <code>b.length</code>.
+ *
+ * @param b
+ * the data.
+ * @exception NullPointerException
+ * if <code>b</code> is null.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this output stream.
+ *
+ * @param inBuf
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(byte[] inBuf, int off, int len) throws IOException {
+ try {
+ if (saslServer != null) { // using saslServer
+ saslToken = saslServer.wrap(inBuf, off, len);
+ } else { // using saslClient
+ saslToken = saslClient.wrap(inBuf, off, len);
+ }
+ } catch (SaslException se) {
+ try {
+ disposeSasl();
+ } catch (SaslException ignored) {
+ }
+ throw se;
+ }
+ if (saslToken != null) {
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ saslToken = null;
+ }
+ }
+
+ /**
+ * Flushes this output stream
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * Closes this output stream and releases any system resources associated with
+ * this stream.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void close() throws IOException {
+ disposeSasl();
+ outStream.close();
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcClient.java?rev=1077150&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcClient.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcClient.java Fri Mar 4 03:46:18 2011
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * A utility class that encapsulates SASL logic for RPC client
+ */
+public class SaslRpcClient {
+ public static final Log LOG = LogFactory.getLog(SaslRpcClient.class);
+
+ private final SaslClient saslClient;
+
+ /**
+ * Create a SaslRpcClient for an authentication method
+ *
+ * @param method
+ * the requested authentication method
+ * @param token
+ * token to use if needed by the authentication method
+ */
+ public SaslRpcClient(AuthMethod method,
+ Token<? extends TokenIdentifier> token, String serverPrincipal)
+ throws IOException {
+ switch (method) {
+ case DIGEST:
+ if (LOG.isDebugEnabled())
+ LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
+ + " client to authenticate to service at " + token.getService());
+ saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+ .getMechanismName() }, null, null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+ break;
+ case KERBEROS:
+ if (LOG.isDebugEnabled()) {
+ LOG
+ .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
+ + " client. Server's Kerberos principal name is "
+ + serverPrincipal);
+ }
+ if (serverPrincipal == null || serverPrincipal.length() == 0) {
+ throw new IOException(
+ "Failed to specify server's Kerberos principal name");
+ }
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
+ .getMechanismName() }, null, names[0], names[1],
+ SaslRpcServer.SASL_PROPS, null);
+ break;
+ default:
+ throw new IOException("Unknown authentication method " + method);
+ }
+ if (saslClient == null)
+ throw new IOException("Unable to find SASL client implementation");
+ }
+
+ /**
+ * Do client side SASL authentication with server via the given InputStream
+ * and OutputStream
+ *
+ * @param inS
+ * InputStream to use
+ * @param outS
+ * OutputStream to use
+ * @throws IOException
+ */
+ public void saslConnect(InputStream inS, OutputStream outS)
+ throws IOException {
+ DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
+ DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
+ outS));
+
+ try {
+ byte[] saslToken = new byte[0];
+ if (saslClient.hasInitialResponse())
+ saslToken = saslClient.evaluateChallenge(saslToken);
+ if (saslToken != null) {
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ outStream.flush();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have sent token of size " + saslToken.length
+ + " from initSASLContext.");
+ }
+ if (!saslClient.isComplete()) {
+ saslToken = new byte[inStream.readInt()];
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will read input token of size " + saslToken.length
+ + " for processing by initSASLContext");
+ inStream.readFully(saslToken);
+ }
+
+ while (!saslClient.isComplete()) {
+ saslToken = saslClient.evaluateChallenge(saslToken);
+ if (saslToken != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will send token of size " + saslToken.length
+ + " from initSASLContext.");
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ outStream.flush();
+ }
+ if (!saslClient.isComplete()) {
+ saslToken = new byte[inStream.readInt()];
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will read input token of size " + saslToken.length
+ + " for processing by initSASLContext");
+ inStream.readFully(saslToken);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client context established. Negotiated QoP: "
+ + saslClient.getNegotiatedProperty(Sasl.QOP));
+ }
+ } catch (IOException e) {
+ saslClient.dispose();
+ throw e;
+ }
+ }
+
+ /**
+ * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
+ * been called.
+ *
+ * @param in
+ * the InputStream to wrap
+ * @return a SASL wrapped InputStream
+ * @throws IOException
+ */
+ public InputStream getInputStream(InputStream in) throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Sasl authentication exchange hasn't completed yet");
+ }
+ return new SaslInputStream(in, saslClient);
+ }
+
+ /**
+ * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
+ * been called.
+ *
+ * @param out
+ * the OutputStream to wrap
+ * @return a SASL wrapped OutputStream
+ * @throws IOException
+ */
+ public OutputStream getOutputStream(OutputStream out) throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Sasl authentication exchange hasn't completed yet");
+ }
+ return new SaslOutputStream(out, saslClient);
+ }
+
+ /** Release resources used by wrapped saslClient */
+ public void dispose() throws SaslException {
+ saslClient.dispose();
+ }
+
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ private final String userName;
+ private final char[] userPassword;
+
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = SaslRpcServer.encodeIdentifier(token.getIdentifier());
+ this.userPassword = SaslRpcServer.encodePassword(token.getPassword());
+ }
+
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting username: " + userName);
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting userPassword");
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java?rev=1077150&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java Fri Mar 4 03:46:18 2011
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * A utility class for dealing with SASL on RPC server
+ */
+public class SaslRpcServer {
+ public static final Log LOG = LogFactory.getLog(SaslRpcServer.class);
+ public static final String SASL_DEFAULT_REALM = "default";
+ public static final Map<String, String> SASL_PROPS =
+ new TreeMap<String, String>();
+ static {
+ // Request authentication plus integrity protection
+ SASL_PROPS.put(Sasl.QOP, "auth-int");
+ // Request mutual authentication
+ SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+ }
+
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ static byte[] decodeIdentifier(String identifier) {
+ return Base64.decodeBase64(identifier.getBytes());
+ }
+
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+
+ /** Splitting fully qualified Kerberos name into parts */
+ public static String[] splitKerberosName(String fullName) {
+ return fullName.split("[/@]");
+ }
+
+ /** Authentication method */
+ public static enum AuthMethod {
+ SIMPLE((byte) 80, ""), // no authentication
+ KERBEROS((byte) 81, "GSSAPI"), // SASL Kerberos authentication
+ DIGEST((byte) 82, "DIGEST-MD5"); // SASL DIGEST-MD5 authentication
+
+ /** The code for this method. */
+ public final byte code;
+ public final String mechanismName;
+
+ private AuthMethod(byte code, String mechanismName) {
+ this.code = code;
+ this.mechanismName = mechanismName;
+ }
+
+ private static final int FIRST_CODE = values()[0].code;
+
+ /** Return the object represented by the code. */
+ private static AuthMethod valueOf(byte code) {
+ final int i = (code & 0xff) - FIRST_CODE;
+ return i < 0 || i >= values().length ? null : values()[i];
+ }
+
+ /** Return the SASL mechanism name */
+ public String getMechanismName() {
+ return mechanismName;
+ }
+
+ /** Read from in */
+ public static AuthMethod read(DataInput in) throws IOException {
+ return valueOf(in.readByte());
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.write(code);
+ }
+ };
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ public static class SaslDigestCallbackHandler implements CallbackHandler {
+ private SecretManager<TokenIdentifier> secretManager;
+
+ public SaslDigestCallbackHandler(
+ SecretManager<TokenIdentifier> secretManager) {
+ this.secretManager = secretManager;
+ }
+
+ private TokenIdentifier getIdentifier(String id) throws IOException {
+ byte[] tokenId = decodeIdentifier(id);
+ TokenIdentifier tokenIdentifier = secretManager.createIdentifier();
+ tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
+ tokenId)));
+ return tokenIdentifier;
+ }
+
+ private char[] getPassword(TokenIdentifier tokenid) throws IOException {
+ return encodePassword(secretManager.retrievePassword(tokenid));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName());
+ char[] password = getPassword(tokenIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+ + "for client: " + tokenIdentifier.getUsername());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ String username = getIdentifier(authzid).getUsername().toString();
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ ac.setAuthorizedID(username);
+ }
+ }
+ }
+ }
+
+ /** CallbackHandler for SASL GSSAPI Kerberos mechanism */
+ public static class SaslGssCallbackHandler implements CallbackHandler {
+
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL GSSAPI Callback");
+ }
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL server GSSAPI callback: setting "
+ + "canonicalized client ID: " + authzid);
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077150&r1=1077149&r2=1077150&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar 4 03:46:18 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.security;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.AccessControlContext;
@@ -61,7 +63,6 @@ import com.sun.security.auth.module.Krb5
*/
public class UserGroupInformation {
private static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
- private static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
/**
* A login module that looks at the Kerberos, Unix, or Windows principal and
@@ -454,14 +455,13 @@ public class UserGroupInformation {
*
* @return an unmodifiable collection of tokens associated with user
*/
- @SuppressWarnings("unchecked")
- public synchronized <Ident extends TokenIdentifier>
- Collection<Token<Ident>> getTokens() {
+ public synchronized
+ Collection<Token<? extends TokenIdentifier>> getTokens() {
Set<Object> creds = subject.getPrivateCredentials();
- List<Token<Ident>> result = new ArrayList<Token<Ident>>(creds.size());
+ List<Token<?>> result = new ArrayList<Token<?>>(creds.size());
for(Object o: creds) {
- if (o instanceof Token) {
- result.add((Token<Ident>) o);
+ if (o instanceof Token<?>) {
+ result.add((Token<?>) o);
}
}
return Collections.unmodifiableList(result);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/SecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/SecretManager.java?rev=1077150&r1=1077149&r2=1077150&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/SecretManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/SecretManager.java Fri Mar 4 03:46:18 2011
@@ -62,6 +62,12 @@ public abstract class SecretManager<T ex
public abstract byte[] retrievePassword(T identifier) throws InvalidToken;
/**
+ * Create an empty token identifier.
+ * @return the newly created empty token identifier
+ */
+ public abstract T createIdentifier();
+
+ /**
* The name of the hashing algorithm.
*/
private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/TokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/TokenIdentifier.java?rev=1077150&r1=1077149&r2=1077150&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/TokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/TokenIdentifier.java Fri Mar 4 03:46:18 2011
@@ -35,6 +35,12 @@ public abstract class TokenIdentifier im
* @return the kind of the token
*/
public abstract Text getKind();
+
+ /**
+ * Get the username encoded in the token identifier
+ * @return the username
+ */
+ public abstract Text getUsername();
/**
* Get the bytes for the token identifier