You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/08/07 18:40:12 UTC
svn commit: r1370354 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/main/java/org/...
Author: atm
Date: Tue Aug 7 16:40:03 2012
New Revision: 1370354
URL: http://svn.apache.org/viewvc?rev=1370354&view=rev
Log:
HDFS-3637. Add support for encrypting the DataTransferProtocol. Contributed by Aaron T. Myers.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithEncryptedTransfer.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Aug 7 16:40:03 2012
@@ -203,6 +203,8 @@ Branch-2 ( Unreleased changes )
HDFS-3513. HttpFS should cache filesystems. (tucu)
+ HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
+
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Aug 7 16:40:03 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.Socket;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
/**
* A BlockReader is responsible for reading a single block
@@ -71,4 +72,8 @@ public interface BlockReader extends Byt
*/
boolean hasSentStatusCode();
+ /**
+ * @return a reference to the streams this block reader is using.
+ */
+ IOStreamPair getStreams();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Tue Aug 7 16:40:03 2012
@@ -25,7 +25,12 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@@ -41,12 +46,13 @@ public class BlockReaderFactory {
Configuration conf,
Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len) throws IOException {
+ long startOffset, long len, DataEncryptionKey encryptionKey)
+ throws IOException {
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
return newBlockReader(new Conf(conf),
sock, file, block, blockToken, startOffset,
- len, bufferSize, true, "");
+ len, bufferSize, true, "", encryptionKey, null);
}
/**
@@ -73,14 +79,32 @@ public class BlockReaderFactory {
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
- String clientName)
+ String clientName,
+ DataEncryptionKey encryptionKey,
+ IOStreamPair ioStreams)
throws IOException {
+
if (conf.useLegacyBlockReader) {
+ if (encryptionKey != null) {
+ throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+ }
return RemoteBlockReader.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
} else {
+ if (ioStreams == null) {
+ ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
+ if (encryptionKey != null) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ ioStreams.out, ioStreams.in, encryptionKey);
+ ioStreams = encryptedStreams;
+ }
+ }
+
return RemoteBlockReader2.newBlockReader(
- sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+ sock, file, block, blockToken, startOffset, len, bufferSize,
+ verifyChecksum, clientName, encryptionKey, ioStreams);
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Aug 7 16:40:03 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
@@ -681,4 +682,9 @@ class BlockReaderLocal implements BlockR
public boolean hasSentStatusCode() {
return false;
}
+
+ @Override
+ public IOStreamPair getStreams() {
+ return null;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug 7 16:40:03 2012
@@ -45,6 +45,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
@@ -53,6 +55,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -107,12 +110,15 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -179,6 +185,7 @@ public class DFSClient implements java.i
final Conf dfsClientConf;
private Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
+ private DataEncryptionKey encryptionKey;
/**
* DFSClient configuration
@@ -348,9 +355,6 @@ public class DFSClient implements java.i
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
- this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
-
-
if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
@@ -380,6 +384,8 @@ public class DFSClient implements java.i
Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
+
+ this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
}
/**
@@ -1418,7 +1424,44 @@ public class DFSClient implements java.i
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);
+ return getFileChecksum(src, namenode, socketFactory,
+ dfsClientConf.socketTimeout, getDataEncryptionKey());
+ }
+
+ @InterfaceAudience.Private
+ public void clearDataEncryptionKey() {
+ LOG.debug("Clearing encryption key");
+ synchronized (this) {
+ encryptionKey = null;
+ }
+ }
+
+ /**
+ * @return true if data sent between this client and DNs should be encrypted,
+ * false otherwise.
+ * @throws IOException in the event of error communicating with the NN
+ */
+ boolean shouldEncryptData() throws IOException {
+ FsServerDefaults d = getServerDefaults();
+ return d == null ? false : d.getEncryptDataTransfer();
+ }
+
+ @InterfaceAudience.Private
+ public DataEncryptionKey getDataEncryptionKey()
+ throws IOException {
+ if (shouldEncryptData()) {
+ synchronized (this) {
+ if (encryptionKey == null ||
+ (encryptionKey != null &&
+ encryptionKey.expiryDate < Time.now())) {
+ LOG.debug("Getting new encryption token from NN");
+ encryptionKey = namenode.getDataEncryptionKey();
+ }
+ return encryptionKey;
+ }
+ } else {
+ return null;
+ }
}
/**
@@ -1427,8 +1470,8 @@ public class DFSClient implements java.i
* @return The checksum
*/
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
- ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
- ) throws IOException {
+ ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+ DataEncryptionKey encryptionKey) throws IOException {
//get all block locations
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
if (null == blockLocations) {
@@ -1471,10 +1514,18 @@ public class DFSClient implements java.i
timeout);
sock.setSoTimeout(timeout);
- out = new DataOutputStream(
- new BufferedOutputStream(NetUtils.getOutputStream(sock),
- HdfsConstants.SMALL_BUFFER_SIZE));
- in = new DataInputStream(NetUtils.getInputStream(sock));
+ OutputStream unbufOut = NetUtils.getOutputStream(sock);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ if (encryptionKey != null) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ unbufOut, unbufIn, encryptionKey);
+ unbufOut = encryptedStreams.out;
+ unbufIn = encryptedStreams.in;
+ }
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ in = new DataInputStream(unbufIn);
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j] + ": "
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Aug 7 16:40:03 2012
@@ -367,4 +367,9 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
+
+ // Security-related configs
+ public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
+ public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
+ public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Aug 7 16:40:03 2012
@@ -37,11 +37,14 @@ import org.apache.hadoop.fs.ChecksumExce
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
@@ -425,6 +428,7 @@ public class DFSInputStream extends FSIn
//
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
+ int refetchEncryptionKey = 1; // only need to get a new encryption key once
boolean connectFailedOnce = false;
@@ -452,7 +456,14 @@ public class DFSInputStream extends FSIn
}
return chosenNode;
} catch (IOException ex) {
- if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
+ if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to " + targetAddr
+ + " : " + ex);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ } else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
DFSClient.LOG.info("Will fetch a new access token and retry, "
+ "access token was invalid when connecting to " + targetAddr
+ " : " + ex);
@@ -754,6 +765,7 @@ public class DFSInputStream extends FSIn
// Connect to best DataNode for desired Block, with potential offset
//
int refetchToken = 1; // only need to get a new access token once
+ int refetchEncryptionKey = 1; // only need to get a new encryption key once
while (true) {
// cached block locations may have been updated by chooseDataNode()
@@ -789,7 +801,14 @@ public class DFSInputStream extends FSIn
dfsClient.disableShortCircuit();
continue;
} catch (IOException e) {
- if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
+ if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to " + targetAddr
+ + " : " + e);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ } else if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
DFSClient.LOG.info("Will get a new access token and retry, "
+ "access token was invalid when connecting to " + targetAddr
+ " : " + e);
@@ -818,8 +837,9 @@ public class DFSInputStream extends FSIn
*/
private void closeBlockReader(BlockReader reader) throws IOException {
if (reader.hasSentStatusCode()) {
+ IOStreamPair ioStreams = reader.getStreams();
Socket oldSock = reader.takeSocket();
- socketCache.put(oldSock);
+ socketCache.put(oldSock, ioStreams);
}
reader.close();
}
@@ -864,14 +884,15 @@ public class DFSInputStream extends FSIn
// Allow retry since there is no way of knowing whether the cached socket
// is good until we actually use it.
for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
- Socket sock = null;
+ SocketAndStreams sockAndStreams = null;
// Don't use the cache on the last attempt - it's possible that there
// are arbitrarily many unusable sockets in the cache, but we don't
// want to fail the read.
if (retries < nCachedConnRetry) {
- sock = socketCache.get(dnAddr);
+ sockAndStreams = socketCache.get(dnAddr);
}
- if (sock == null) {
+ Socket sock;
+ if (sockAndStreams == null) {
fromCache = false;
sock = dfsClient.socketFactory.createSocket();
@@ -895,6 +916,8 @@ public class DFSInputStream extends FSIn
dfsClient.getRandomLocalInterfaceAddr(),
dfsClient.getConf().socketTimeout);
sock.setSoTimeout(dfsClient.getConf().socketTimeout);
+ } else {
+ sock = sockAndStreams.sock;
}
try {
@@ -905,12 +928,18 @@ public class DFSInputStream extends FSIn
blockToken,
startOffset, len,
bufferSize, verifyChecksum,
- clientName);
+ clientName,
+ dfsClient.getDataEncryptionKey(),
+ sockAndStreams == null ? null : sockAndStreams.ioStreams);
return reader;
} catch (IOException ex) {
// Our socket is no good.
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
- sock.close();
+ if (sockAndStreams != null) {
+ sockAndStreams.close();
+ } else {
+ sock.close();
+ }
err = ex;
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Aug 7 16:40:03 2012
@@ -24,7 +24,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InterruptedIOException;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
@@ -56,6 +58,9 @@ import org.apache.hadoop.hdfs.protocol.N
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -867,16 +872,26 @@ public class DFSOutputStream extends FSO
try {
sock = createSocketForPipeline(src, 2, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
- out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock, writeTimeout),
+
+ OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ if (dfsClient.shouldEncryptData()) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
+ unbufOut = encryptedStreams.out;
+ unbufIn = encryptedStreams.in;
+ }
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
+ in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
targets);
+ out.flush();
//ack
- in = new DataInputStream(NetUtils.getInputStream(sock));
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
@@ -1034,77 +1049,98 @@ public class DFSOutputStream extends FSO
// persist blocks on namenode on next flush
persistBlocks.set(true);
- boolean result = false;
- DataOutputStream out = null;
- try {
- assert null == s : "Previous socket unclosed";
- s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
- long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
-
- //
- // Xmit header info to datanode
- //
- out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(s, writeTimeout),
- HdfsConstants.SMALL_BUFFER_SIZE));
-
- assert null == blockReplyStream : "Previous blockReplyStream unclosed";
- blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
- // send the request
- new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
- nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
- nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
-
- // receive ack for connect
- BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
- HdfsProtoUtil.vintPrefixed(blockReplyStream));
- pipelineStatus = resp.getStatus();
- firstBadLink = resp.getFirstBadLink();
-
- if (pipelineStatus != SUCCESS) {
- if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
- throw new InvalidBlockTokenException(
- "Got access token error for connect ack with firstBadLink as "
- + firstBadLink);
- } else {
- throw new IOException("Bad connect ack with firstBadLink as "
- + firstBadLink);
+ int refetchEncryptionKey = 1;
+ while (true) {
+ boolean result = false;
+ DataOutputStream out = null;
+ try {
+ assert null == s : "Previous socket unclosed";
+ assert null == blockReplyStream : "Previous blockReplyStream unclosed";
+ s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
+ long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(s);
+ if (dfsClient.shouldEncryptData()) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(unbufOut,
+ unbufIn, dfsClient.getDataEncryptionKey());
+ unbufOut = encryptedStreams.out;
+ unbufIn = encryptedStreams.in;
+ }
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ blockReplyStream = new DataInputStream(unbufIn);
+
+ //
+ // Xmit header info to datanode
+ //
+
+ // send the request
+ new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+ nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
+ nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+
+ // receive ack for connect
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(blockReplyStream));
+ pipelineStatus = resp.getStatus();
+ firstBadLink = resp.getFirstBadLink();
+
+ if (pipelineStatus != SUCCESS) {
+ if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException(
+ "Got access token error for connect ack with firstBadLink as "
+ + firstBadLink);
+ } else {
+ throw new IOException("Bad connect ack with firstBadLink as "
+ + firstBadLink);
+ }
}
- }
- assert null == blockStream : "Previous blockStream unclosed";
- blockStream = out;
- result = true; // success
-
- } catch (IOException ie) {
-
- DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
-
- // find the datanode that matches
- if (firstBadLink.length() != 0) {
- for (int i = 0; i < nodes.length; i++) {
- if (nodes[i].getXferAddr().equals(firstBadLink)) {
- errorIndex = i;
- break;
+ assert null == blockStream : "Previous blockStream unclosed";
+ blockStream = out;
+ result = true; // success
+
+ } catch (IOException ie) {
+ DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+ if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to "
+ + nodes[0].getXferAddr() + " : " + ie);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ // Don't close the socket/exclude this node just yet. Try again with
+ // a new encryption key.
+ continue;
+ }
+
+ // find the datanode that matches
+ if (firstBadLink.length() != 0) {
+ for (int i = 0; i < nodes.length; i++) {
+ if (nodes[i].getXferAddr().equals(firstBadLink)) {
+ errorIndex = i;
+ break;
+ }
}
+ } else {
+ errorIndex = 0;
+ }
+ hasError = true;
+ setLastException(ie);
+ result = false; // error
+ } finally {
+ if (!result) {
+ IOUtils.closeSocket(s);
+ s = null;
+ IOUtils.closeStream(out);
+ out = null;
+ IOUtils.closeStream(blockReplyStream);
+ blockReplyStream = null;
}
- } else {
- errorIndex = 0;
- }
- hasError = true;
- setLastException(ie);
- result = false; // error
- } finally {
- if (!result) {
- IOUtils.closeSocket(s);
- s = null;
- IOUtils.closeStream(out);
- out = null;
- IOUtils.closeStream(blockReplyStream);
- blockReplyStream = null;
}
+ return result;
}
- return result;
}
private LocatedBlock locateFollowingBlock(long start,
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Aug 7 16:40:03 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -458,7 +459,9 @@ public class RemoteBlockReader extends F
void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
- RemoteBlockReader2.writeReadResult(sock, statusCode);
+ RemoteBlockReader2.writeReadResult(
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+ statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
@@ -484,4 +487,11 @@ public class RemoteBlockReader extends F
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
+ @Override
+ public IOStreamPair getStreams() {
+ // This class doesn't support encryption, which is the only thing this
+ // method is used for. See HDFS-3637.
+ return null;
+ }
+
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Aug 7 16:40:03 2012
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -35,12 +36,15 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -83,7 +87,9 @@ public class RemoteBlockReader2 impleme
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
- Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+ Socket dnSock;
+ // for now just sending the status code (e.g. checksumOk) after the read.
+ private IOStreamPair ioStreams;
private final ReadableByteChannel in;
private DataChecksum checksum;
@@ -206,9 +212,9 @@ public class RemoteBlockReader2 impleme
if (bytesNeededToFinish <= 0) {
readTrailingEmptyPacket();
if (verifyChecksum) {
- sendReadResult(dnSock, Status.CHECKSUM_OK);
+ sendReadResult(Status.CHECKSUM_OK);
} else {
- sendReadResult(dnSock, Status.SUCCESS);
+ sendReadResult(Status.SUCCESS);
}
}
}
@@ -292,9 +298,11 @@ public class RemoteBlockReader2 impleme
protected RemoteBlockReader2(String file, String bpid, long blockId,
ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+ long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
+ IOStreamPair ioStreams) {
// Path is used only for printing block and file information in debug
this.dnSock = dnSock;
+ this.ioStreams = ioStreams;
this.in = in;
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
@@ -369,24 +377,23 @@ public class RemoteBlockReader2 impleme
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
- void sendReadResult(Socket sock, Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + sock;
+ void sendReadResult(Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + dnSock;
try {
- writeReadResult(sock, statusCode);
+ writeReadResult(ioStreams.out, statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- sock.getInetAddress() + ": " + e.getMessage());
+ dnSock.getInetAddress() + ": " + e.getMessage());
}
}
/**
* Serialize the actual read result on the wire.
*/
- static void writeReadResult(Socket sock, Status statusCode)
+ static void writeReadResult(OutputStream out, Status statusCode)
throws IOException {
- OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
ClientReadStatusProto.newBuilder()
.setStatus(statusCode)
@@ -434,25 +441,32 @@ public class RemoteBlockReader2 impleme
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
- public static BlockReader newBlockReader( Socket sock, String file,
+ public static BlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
- String clientName)
+ String clientName,
+ DataEncryptionKey encryptionKey,
+ IOStreamPair ioStreams)
throws IOException {
+
+ ReadableByteChannel ch;
+ if (ioStreams.in instanceof SocketInputWrapper) {
+ ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+ } else {
+ ch = (ReadableByteChannel) ioStreams.in;
+ }
+
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock,
- HdfsServerConstants.WRITE_TIMEOUT)));
+ ioStreams.out));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
- // Get bytes in block, set streams
+ // Get bytes in block
//
- SocketInputWrapper sin = NetUtils.getInputStream(sock);
- ReadableByteChannel ch = sin.getReadableByteChannel();
- DataInputStream in = new DataInputStream(sin);
+ DataInputStream in = new DataInputStream(ioStreams.in);
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
@@ -474,7 +488,8 @@ public class RemoteBlockReader2 impleme
}
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
- ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+ ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
+ ioStreams);
}
static void checkSuccess(
@@ -498,4 +513,9 @@ public class RemoteBlockReader2 impleme
}
}
}
+
+ @Override
+ public IOStreamPair getStreams() {
+ return ioStreams;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Tue Aug 7 16:40:03 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
+import java.io.Closeable;
import java.net.Socket;
import java.net.SocketAddress;
@@ -29,6 +30,8 @@ import com.google.common.base.Preconditi
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils;
/**
@@ -37,7 +40,7 @@ import org.apache.hadoop.io.IOUtils;
class SocketCache {
static final Log LOG = LogFactory.getLog(SocketCache.class);
- private final LinkedListMultimap<SocketAddress, Socket> multimap;
+ private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
private final int capacity;
/**
@@ -57,21 +60,21 @@ class SocketCache {
* @param remote Remote address the socket is connected to.
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
- public synchronized Socket get(SocketAddress remote) {
+ public synchronized SocketAndStreams get(SocketAddress remote) {
if (capacity <= 0) { // disabled
return null;
}
- List<Socket> socklist = multimap.get(remote);
+ List<SocketAndStreams> socklist = multimap.get(remote);
if (socklist == null) {
return null;
}
- Iterator<Socket> iter = socklist.iterator();
+ Iterator<SocketAndStreams> iter = socklist.iterator();
while (iter.hasNext()) {
- Socket candidate = iter.next();
+ SocketAndStreams candidate = iter.next();
iter.remove();
- if (!candidate.isClosed()) {
+ if (!candidate.sock.isClosed()) {
return candidate;
}
}
@@ -82,10 +85,11 @@ class SocketCache {
* Give an unused socket to the cache.
* @param sock socket not used by anyone.
*/
- public synchronized void put(Socket sock) {
+ public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+ SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
if (capacity <= 0) {
// Cache disabled.
- IOUtils.closeSocket(sock);
+ s.close();
return;
}
@@ -102,7 +106,7 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
- multimap.put(remoteAddr, sock);
+ multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
}
public synchronized int size() {
@@ -113,23 +117,23 @@ class SocketCache {
* Evict the oldest entry in the cache.
*/
private synchronized void evictOldest() {
- Iterator<Entry<SocketAddress, Socket>> iter =
+ Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache!");
}
- Entry<SocketAddress, Socket> entry = iter.next();
+ Entry<SocketAddress, SocketAndStreams> entry = iter.next();
iter.remove();
- Socket sock = entry.getValue();
- IOUtils.closeSocket(sock);
+ SocketAndStreams s = entry.getValue();
+ s.close();
}
/**
* Empty the cache, and close all sockets.
*/
public synchronized void clear() {
- for (Socket sock : multimap.values()) {
- IOUtils.closeSocket(sock);
+ for (SocketAndStreams s : multimap.values()) {
+ s.close();
}
multimap.clear();
}
@@ -138,5 +142,25 @@ class SocketCache {
protected void finalize() {
clear();
}
+
+ @InterfaceAudience.Private
+ static class SocketAndStreams implements Closeable {
+ public final Socket sock;
+ public final IOStreamPair ioStreams;
+
+ public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+ this.sock = s;
+ this.ioStreams = ioStreams;
+ }
+
+ @Override
+ public void close() {
+ if (ioStreams != null) {
+ IOUtils.closeStream(ioStreams.in);
+ IOUtils.closeStream(ioStreams.out);
+ }
+ IOUtils.closeSocket(sock);
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Aug 7 16:40:03 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@@ -941,4 +942,11 @@ public interface ClientProtocol {
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException;
+
+ /**
+ * @return encryption key so a client can encrypt data sent via the
+ * DataTransferProtocol to/from DataNodes.
+ * @throws IOException
+ */
+ public DataEncryptionKey getDataEncryptionKey() throws IOException;
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java?rev=1370354&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java Tue Aug 7 16:40:03 2012
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
+/**
+ * A class which, given connected input/output streams, will perform a
+ * handshake using those streams based on SASL to produce new Input/Output
+ * streams which will encrypt/decrypt all data written/read from said streams.
+ * Much of this is inspired by or borrowed from the TSaslTransport in Apache
+ * Thrift, but with some HDFS-specific tweaks.
+ */
+@InterfaceAudience.Private
+public class DataTransferEncryptor {
+
+ public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class);
+
+ /**
+ * Sent by clients and validated by servers. We use a number that's unlikely
+ * to ever be sent as the value of the DATA_TRANSFER_VERSION.
+ */
+ private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+
+ /**
+ * Delimiter for the three-part SASL username string.
+ */
+ private static final String NAME_DELIMITER = " ";
+
+ // This has to be set as part of the SASL spec, but it don't matter for
+ // our purposes, but may not be empty. It's sent over the wire, so use
+ // a short string.
+ private static final String SERVER_NAME = "0";
+
+ private static final String PROTOCOL = "hdfs";
+ private static final String MECHANISM = "DIGEST-MD5";
+ private static final Map<String, String> SASL_PROPS = new TreeMap<String, String>();
+
+ static {
+ SASL_PROPS.put(Sasl.QOP, "auth-conf");
+ SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+ }
+
+ /**
+ * Factory method for DNs, where the nonce, keyId, and encryption key are not
+ * yet known. The nonce and keyId will be sent by the client, and the DN
+ * will then use those pieces of info and the secret key shared with the NN
+ * to determine the encryptionKey used for the SASL handshake/encryption.
+ *
+ * Establishes a secure connection assuming that the party on the other end
+ * has the same shared secret. This does a SASL connection handshake, but not
+ * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
+ * auth-conf enabled. In particular, it doesn't support an arbitrary number of
+ * challenge/response rounds, and we know that the client will never have an
+ * initial response, so we don't check for one.
+ *
+ * @param underlyingOut output stream to write to the other party
+ * @param underlyingIn input stream to read from the other party
+ * @param blockPoolTokenSecretManager secret manager capable of constructing
+ * encryption key based on keyId, blockPoolId, and nonce
+ * @return a pair of streams which wrap the given streams and encrypt/decrypt
+ * all data read/written
+ * @throws IOException in the event of error
+ */
+ public static IOStreamPair getEncryptedStreams(
+ OutputStream underlyingOut, InputStream underlyingIn,
+ BlockPoolTokenSecretManager blockPoolTokenSecretManager,
+ String encryptionAlgorithm) throws IOException {
+
+ DataInputStream in = new DataInputStream(underlyingIn);
+ DataOutputStream out = new DataOutputStream(underlyingOut);
+
+ Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
+ saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server using encryption algorithm " + encryptionAlgorithm);
+ }
+
+ SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM,
+ PROTOCOL, SERVER_NAME, saslProps,
+ new SaslServerCallbackHandler(blockPoolTokenSecretManager)));
+
+ int magicNumber = in.readInt();
+ if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) {
+ throw new InvalidMagicNumberException(magicNumber);
+ }
+ try {
+ // step 1
+ performSaslStep1(out, in, sasl);
+
+ // step 2 (server-side only)
+ byte[] remoteResponse = readSaslMessage(in);
+ byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ sendSaslMessage(out, localResponse);
+
+ // SASL handshake is complete
+ checkSaslComplete(sasl);
+
+ return sasl.createEncryptedStreamPair(out, in);
+ } catch (IOException ioe) {
+ if (ioe instanceof SaslException &&
+ ioe.getCause() != null &&
+ ioe.getCause() instanceof InvalidEncryptionKeyException) {
+ // This could just be because the client is long-lived and hasn't gotten
+ // a new encryption key from the NN in a while. Upon receiving this
+ // error, the client will get a new encryption key from the NN and retry
+ // connecting to this DN.
+ sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
+ } else {
+ sendGenericSaslErrorMessage(out, ioe.getMessage());
+ }
+ throw ioe;
+ }
+ }
+
+ /**
+ * Factory method for clients, where the encryption token is already created.
+ *
+ * Establishes a secure connection assuming that the party on the other end
+ * has the same shared secret. This does a SASL connection handshake, but not
+ * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
+ * auth-conf enabled. In particular, it doesn't support an arbitrary number of
+ * challenge/response rounds, and we know that the client will never have an
+ * initial response, so we don't check for one.
+ *
+ * @param underlyingOut output stream to write to the other party
+ * @param underlyingIn input stream to read from the other party
+ * @param encryptionKey all info required to establish an encrypted stream
+ * @return a pair of streams which wrap the given streams and encrypt/decrypt
+ * all data read/written
+ * @throws IOException in the event of error
+ */
+ public static IOStreamPair getEncryptedStreams(
+ OutputStream underlyingOut, InputStream underlyingIn,
+ DataEncryptionKey encryptionKey)
+ throws IOException {
+
+ Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
+ saslProps.put("com.sun.security.sasl.digest.cipher",
+ encryptionKey.encryptionAlgorithm);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Client using encryption algorithm " +
+ encryptionKey.encryptionAlgorithm);
+ }
+
+ DataOutputStream out = new DataOutputStream(underlyingOut);
+ DataInputStream in = new DataInputStream(underlyingIn);
+
+ String userName = getUserNameFromEncryptionKey(encryptionKey);
+ SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient(
+ new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps,
+ new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName)));
+
+ out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER);
+ out.flush();
+
+ try {
+ // Start of handshake - "initial response" in SASL terminology.
+ sendSaslMessage(out, new byte[0]);
+
+ // step 1
+ performSaslStep1(out, in, sasl);
+
+ // step 2 (client-side only)
+ byte[] remoteResponse = readSaslMessage(in);
+ byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ assert localResponse == null;
+
+ // SASL handshake is complete
+ checkSaslComplete(sasl);
+
+ return sasl.createEncryptedStreamPair(out, in);
+ } catch (IOException ioe) {
+ sendGenericSaslErrorMessage(out, ioe.getMessage());
+ throw ioe;
+ }
+ }
+
+ private static void performSaslStep1(DataOutputStream out, DataInputStream in,
+ SaslParticipant sasl) throws IOException {
+ byte[] remoteResponse = readSaslMessage(in);
+ byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ sendSaslMessage(out, localResponse);
+ }
+
+ private static void checkSaslComplete(SaslParticipant sasl) throws IOException {
+ if (!sasl.isComplete()) {
+ throw new IOException("Failed to complete SASL handshake");
+ }
+
+ if (!sasl.supportsConfidentiality()) {
+ throw new IOException("SASL handshake completed, but channel does not " +
+ "support encryption");
+ }
+ }
+
+ private static void sendSaslMessage(DataOutputStream out, byte[] payload)
+ throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
+ }
+
+ private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
+ String message) throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
+ message);
+ }
+
+ private static void sendGenericSaslErrorMessage(DataOutputStream out,
+ String message) throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
+ }
+
+ private static void sendSaslMessage(OutputStream out,
+ DataTransferEncryptorStatus status, byte[] payload, String message)
+ throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+
+ builder.setStatus(status);
+ if (payload != null) {
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (message != null) {
+ builder.setMessage(message);
+ }
+
+ DataTransferEncryptorMessageProto proto = builder.build();
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ private static byte[] readSaslMessage(DataInputStream in) throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ return proto.getPayload().toByteArray();
+ }
+ }
+
+ /**
+ * Set the encryption key when asked by the server-side SASL object.
+ */
+ private static class SaslServerCallbackHandler implements CallbackHandler {
+
+ private BlockPoolTokenSecretManager blockPoolTokenSecretManager;
+
+ public SaslServerCallbackHandler(BlockPoolTokenSecretManager
+ blockPoolTokenSecretManager) {
+ this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback: " + callback);
+ }
+ }
+
+ if (pc != null) {
+ byte[] encryptionKey = getEncryptionKeyFromUserName(
+ blockPoolTokenSecretManager, nc.getDefaultName());
+ pc.setPassword(encryptionKeyToPassword(encryptionKey));
+ }
+
+ if (ac != null) {
+ ac.setAuthorized(true);
+ ac.setAuthorizedID(ac.getAuthorizationID());
+ }
+
+ }
+
+ }
+
+ /**
+ * Set the encryption key when asked by the client-side SASL object.
+ */
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+
+ private byte[] encryptionKey;
+ private String userName;
+
+ public SaslClientCallbackHandler(byte[] encryptionKey, String userName) {
+ this.encryptionKey = encryptionKey;
+ this.userName = userName;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ pc.setPassword(encryptionKeyToPassword(encryptionKey));
+ }
+ if (rc != null) {
+ rc.setText(rc.getDefaultText());
+ }
+ }
+
+ }
+
+ /**
+ * The SASL username consists of the keyId, blockPoolId, and nonce with the
+ * first two encoded as Strings, and the third encoded using Base64. The
+ * fields are each separated by a single space.
+ *
+ * @param encryptionKey the encryption key to encode as a SASL username.
+ * @return encoded username containing keyId, blockPoolId, and nonce
+ */
+ private static String getUserNameFromEncryptionKey(
+ DataEncryptionKey encryptionKey) {
+ return encryptionKey.keyId + NAME_DELIMITER +
+ encryptionKey.blockPoolId + NAME_DELIMITER +
+ new String(Base64.encodeBase64(encryptionKey.nonce, false));
+ }
+
+ /**
+ * Given a secret manager and a username encoded as described above, determine
+ * the encryption key.
+ *
+ * @param blockPoolTokenSecretManager to determine the encryption key.
+ * @param userName containing the keyId, blockPoolId, and nonce.
+ * @return secret encryption key.
+ * @throws IOException
+ */
+ private static byte[] getEncryptionKeyFromUserName(
+ BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName)
+ throws IOException {
+ String[] nameComponents = userName.split(NAME_DELIMITER);
+ if (nameComponents.length != 3) {
+ throw new IOException("Provided name '" + userName + "' has " +
+ nameComponents.length + " components instead of the expected 3.");
+ }
+ int keyId = Integer.parseInt(nameComponents[0]);
+ String blockPoolId = nameComponents[1];
+ byte[] nonce = Base64.decodeBase64(nameComponents[2]);
+ return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
+ blockPoolId, nonce);
+ }
+
+ private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+ return new String(Base64.encodeBase64(encryptionKey, false)).toCharArray();
+ }
+
+ /**
+ * Strongly inspired by Thrift's TSaslTransport class.
+ *
+ * Used to abstract over the <code>SaslServer</code> and
+ * <code>SaslClient</code> classes, which share a lot of their interface, but
+ * unfortunately don't share a common superclass.
+ */
+ private static class SaslParticipant {
+ // One of these will always be null.
+ public SaslServer saslServer;
+ public SaslClient saslClient;
+
+ public SaslParticipant(SaslServer saslServer) {
+ this.saslServer = saslServer;
+ }
+
+ public SaslParticipant(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
+ public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
+ if (saslClient != null) {
+ return saslClient.evaluateChallenge(challengeOrResponse);
+ } else {
+ return saslServer.evaluateResponse(challengeOrResponse);
+ }
+ }
+
+ public boolean isComplete() {
+ if (saslClient != null)
+ return saslClient.isComplete();
+ else
+ return saslServer.isComplete();
+ }
+
+ public boolean supportsConfidentiality() {
+ String qop = null;
+ if (saslClient != null) {
+ qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ } else {
+ qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ }
+ return qop != null && qop.equals("auth-conf");
+ }
+
+ // Return some input/output streams that will henceforth have their
+ // communication encrypted.
+ private IOStreamPair createEncryptedStreamPair(
+ DataOutputStream out, DataInputStream in) {
+ if (saslClient != null) {
+ return new IOStreamPair(
+ new SaslInputStream(in, saslClient),
+ new SaslOutputStream(out, saslClient));
+ } else {
+ return new IOStreamPair(
+ new SaslInputStream(in, saslServer),
+ new SaslOutputStream(out, saslServer));
+ }
+ }
+ }
+
+ @InterfaceAudience.Private
+ public static class InvalidMagicNumberException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidMagicNumberException(int magicNumber) {
+ super(String.format("Received %x instead of %x from client.",
+ magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER));
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java?rev=1370354&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java Tue Aug 7 16:40:03 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A little struct class to wrap an InputStream and an OutputStream.
+ */
+@InterfaceAudience.Private
+public class IOStreamPair {
+ public final InputStream in;
+ public final OutputStream out;
+
+ public IOStreamPair(InputStream in, OutputStream out) {
+ this.in = in;
+ this.out = out;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java?rev=1370354&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java Tue Aug 7 16:40:03 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Encryption key verification failed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InvalidEncryptionKeyException extends IOException {
+ private static final long serialVersionUID = 0l;
+
+ public InvalidEncryptionKeyException() {
+ super();
+ }
+
+ public InvalidEncryptionKeyException(String msg) {
+ super(msg);
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Tue Aug 7 16:40:03 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Receiver implements DataTransferProtocol {
- protected final DataInputStream in;
-
- /** Create a receiver for DataTransferProtocol with a socket. */
- protected Receiver(final DataInputStream in) {
+ protected DataInputStream in;
+
+ /** Initialize a receiver for DataTransferProtocol with a socket. */
+ protected void initialize(final DataInputStream in) {
this.in = in;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Aug 7 16:40:03 2012
@@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@@ -127,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
@@ -830,4 +833,18 @@ public class ClientNamenodeProtocolServe
throw new ServiceException(e);
}
}
+
+ @Override
+ public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
+ RpcController controller, GetDataEncryptionKeyRequestProto request)
+ throws ServiceException {
+ try {
+ DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
+ return GetDataEncryptionKeyResponseProto.newBuilder()
+ .setDataEncryptionKey(PBHelper.convert(encryptionKey))
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Tue Aug 7 16:40:03 2012
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
@@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -815,9 +817,22 @@ public class ClientNamenodeProtocolTrans
ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
}
+
+ @Override
+ public DataEncryptionKey getDataEncryptionKey() throws IOException {
+ GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto
+ .newBuilder().build();
+ try {
+ return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req)
+ .getDataEncryptionKey());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
+
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Aug 7 16:40:03 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -970,12 +972,37 @@ public class PBHelper {
.setIsLastBlockComplete(lb.isLastBlockComplete()).build();
}
+ // DataEncryptionKey
+ public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
+ String encryptionAlgorithm = bet.getEncryptionAlgorithm();
+ return new DataEncryptionKey(bet.getKeyId(),
+ bet.getBlockPoolId(),
+ bet.getNonce().toByteArray(),
+ bet.getEncryptionKey().toByteArray(),
+ bet.getExpiryDate(),
+ encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
+ }
+
+ public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
+ DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
+ .setKeyId(bet.keyId)
+ .setBlockPoolId(bet.blockPoolId)
+ .setNonce(ByteString.copyFrom(bet.nonce))
+ .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
+ .setExpiryDate(bet.expiryDate);
+ if (bet.encryptionAlgorithm != null) {
+ b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
+ }
+ return b.build();
+ }
+
public static FsServerDefaults convert(FsServerDefaultsProto fs) {
if (fs == null) return null;
return new FsServerDefaults(
fs.getBlockSize(), fs.getBytesPerChecksum(),
fs.getWritePacketSize(), (short) fs.getReplication(),
- fs.getFileBufferSize());
+ fs.getFileBufferSize(),
+ fs.getEncryptDataTransfer());
}
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -983,7 +1010,10 @@ public class PBHelper {
return FsServerDefaultsProto.newBuilder().
setBlockSize(fs.getBlockSize()).
setBytesPerChecksum(fs.getBytesPerChecksum()).
- setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build();
+ setWritePacketSize(fs.getWritePacketSize())
+ .setReplication(fs.getReplication())
+ .setFileBufferSize(fs.getFileBufferSize())
+ .setEncryptDataTransfer(fs.getEncryptDataTransfer()).build();
}
public static FsPermissionProto convert(FsPermission p) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java Tue Aug 7 16:40:03 2012
@@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager
btsm.clearAllKeysForTesting();
}
}
+
+ public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) {
+ return get(blockPoolId).generateDataEncryptionKey();
+ }
+
+ public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId,
+ byte[] nonce) throws IOException {
+ return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce);
+ }
}