You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/08/07 18:40:12 UTC

svn commit: r1370354 [1/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/...

Author: atm
Date: Tue Aug  7 16:40:03 2012
New Revision: 1370354

URL: http://svn.apache.org/viewvc?rev=1370354&view=rev
Log:
HDFS-3637. Add support for encrypting the DataTransferProtocol. Contributed by Aaron T. Myers.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithEncryptedTransfer.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Aug  7 16:40:03 2012
@@ -203,6 +203,8 @@ Branch-2 ( Unreleased changes )
 
     HDFS-3513. HttpFS should cache filesystems. (tucu)
 
+    HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
+
   IMPROVEMENTS
 
     HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Aug  7 16:40:03 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.Socket;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -71,4 +72,8 @@ public interface BlockReader extends Byt
    */
   boolean hasSentStatusCode();
 
+  /**
+   * @return a reference to the streams this block reader is using.
+   */
+  IOStreamPair getStreams();
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Tue Aug  7 16:40:03 2012
@@ -25,7 +25,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 
 
@@ -41,12 +46,13 @@ public class BlockReaderFactory {
       Configuration conf,
       Socket sock, String file,
       ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len) throws IOException {
+      long startOffset, long len, DataEncryptionKey encryptionKey)
+          throws IOException {
     int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
         DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
     return newBlockReader(new Conf(conf),
         sock, file, block, blockToken, startOffset,
-        len, bufferSize, true, "");
+        len, bufferSize, true, "", encryptionKey, null);
   }
 
   /**
@@ -73,14 +79,32 @@ public class BlockReaderFactory {
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
-                                     String clientName)
+                                     String clientName,
+                                     DataEncryptionKey encryptionKey,
+                                     IOStreamPair ioStreams)
                                      throws IOException {
+    
     if (conf.useLegacyBlockReader) {
+      if (encryptionKey != null) {
+        throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+      }
       return RemoteBlockReader.newBlockReader(
           sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
     } else {
+      if (ioStreams == null) {
+        ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
+            NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
+        if (encryptionKey != null) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  ioStreams.out, ioStreams.in, encryptionKey);
+          ioStreams = encryptedStreams;
+        }
+      }
+      
       return RemoteBlockReader2.newBlockReader(
-          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);      
+          sock, file, block, blockToken, startOffset, len, bufferSize,
+          verifyChecksum, clientName, encryptionKey, ioStreams);
     }
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Aug  7 16:40:03 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
@@ -681,4 +682,9 @@ class BlockReaderLocal implements BlockR
   public boolean hasSentStatusCode() {
     return false;
   }
+
+  @Override
+  public IOStreamPair getStreams() {
+    return null;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug  7 16:40:03 2012
@@ -45,6 +45,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 
@@ -53,6 +55,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -107,12 +110,15 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -179,6 +185,7 @@ public class DFSClient implements java.i
   final Conf dfsClientConf;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
+  private DataEncryptionKey encryptionKey;
 
   /**
    * DFSClient configuration 
@@ -348,9 +355,6 @@ public class DFSClient implements java.i
     this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
         DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
     
-    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
-    
-    
     if (rpcNamenode != null) {
       // This case is used for testing.
       Preconditions.checkArgument(nameNodeUri == null);
@@ -380,6 +384,8 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
+    
+    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
   }
 
   /**
@@ -1418,7 +1424,44 @@ public class DFSClient implements java.i
    */
   public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
     checkOpen();
-    return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);    
+    return getFileChecksum(src, namenode, socketFactory,
+        dfsClientConf.socketTimeout, getDataEncryptionKey());
+  }
+  
+  @InterfaceAudience.Private
+  public void clearDataEncryptionKey() {
+    LOG.debug("Clearing encryption key");
+    synchronized (this) {
+      encryptionKey = null;
+    }
+  }
+  
+  /**
+   * @return true if data sent between this client and DNs should be encrypted,
+   *         false otherwise.
+   * @throws IOException in the event of error communicating with the NN
+   */
+  boolean shouldEncryptData() throws IOException {
+    FsServerDefaults d = getServerDefaults();
+    return d == null ? false : d.getEncryptDataTransfer();
+  }
+  
+  @InterfaceAudience.Private
+  public DataEncryptionKey getDataEncryptionKey()
+      throws IOException {
+    if (shouldEncryptData()) {
+      synchronized (this) {
+        if (encryptionKey == null ||
+            (encryptionKey != null &&
+             encryptionKey.expiryDate < Time.now())) {
+          LOG.debug("Getting new encryption token from NN");
+          encryptionKey = namenode.getDataEncryptionKey();
+        }
+        return encryptionKey;
+      }
+    } else {
+      return null;
+    }
   }
 
   /**
@@ -1427,8 +1470,8 @@ public class DFSClient implements java.i
    * @return The checksum 
    */
   public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
-      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
-      ) throws IOException {
+      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+      DataEncryptionKey encryptionKey) throws IOException {
     //get all block locations
     LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
     if (null == blockLocations) {
@@ -1471,10 +1514,18 @@ public class DFSClient implements java.i
               timeout);
           sock.setSoTimeout(timeout);
 
-          out = new DataOutputStream(
-              new BufferedOutputStream(NetUtils.getOutputStream(sock), 
-                                       HdfsConstants.SMALL_BUFFER_SIZE));
-          in = new DataInputStream(NetUtils.getInputStream(sock));
+          OutputStream unbufOut = NetUtils.getOutputStream(sock);
+          InputStream unbufIn = NetUtils.getInputStream(sock);
+          if (encryptionKey != null) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(
+                    unbufOut, unbufIn, encryptionKey);
+            unbufOut = encryptedStreams.out;
+            unbufIn = encryptedStreams.in;
+          }
+          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          in = new DataInputStream(unbufIn);
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("write to " + datanodes[j] + ": "

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Aug  7 16:40:03 2012
@@ -367,4 +367,9 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
   public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
   public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
+  
+  // Security-related configs
+  public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
+  public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
+  public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Aug  7 16:40:03 2012
@@ -37,11 +37,14 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
@@ -425,6 +428,7 @@ public class DFSInputStream extends FSIn
     //
     DatanodeInfo chosenNode = null;
     int refetchToken = 1; // only need to get a new access token once
+    int refetchEncryptionKey = 1; // only need to get a new encryption key once
     
     boolean connectFailedOnce = false;
 
@@ -452,7 +456,14 @@ public class DFSInputStream extends FSIn
         }
         return chosenNode;
       } catch (IOException ex) {
-        if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
+        if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+              + "encryption key was invalid when connecting to " + targetAddr
+              + " : " + ex);
+          // The encryption key used is invalid.
+          refetchEncryptionKey--;
+          dfsClient.clearDataEncryptionKey();
+        } else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will fetch a new access token and retry, " 
               + "access token was invalid when connecting to " + targetAddr
               + " : " + ex);
@@ -754,6 +765,7 @@ public class DFSInputStream extends FSIn
     // Connect to best DataNode for desired Block, with potential offset
     //
     int refetchToken = 1; // only need to get a new access token once
+    int refetchEncryptionKey = 1; // only need to get a new encryption key once
     
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
@@ -789,7 +801,14 @@ public class DFSInputStream extends FSIn
         dfsClient.disableShortCircuit();
         continue;
       } catch (IOException e) {
-        if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
+        if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+              + "encryption key was invalid when connecting to " + targetAddr
+              + " : " + e);
+          // The encryption key used is invalid.
+          refetchEncryptionKey--;
+          dfsClient.clearDataEncryptionKey();
+        } else if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will get a new access token and retry, "
               + "access token was invalid when connecting to " + targetAddr
               + " : " + e);
@@ -818,8 +837,9 @@ public class DFSInputStream extends FSIn
    */
   private void closeBlockReader(BlockReader reader) throws IOException {
     if (reader.hasSentStatusCode()) {
+      IOStreamPair ioStreams = reader.getStreams();
       Socket oldSock = reader.takeSocket();
-      socketCache.put(oldSock);
+      socketCache.put(oldSock, ioStreams);
     }
     reader.close();
   }
@@ -864,14 +884,15 @@ public class DFSInputStream extends FSIn
     // Allow retry since there is no way of knowing whether the cached socket
     // is good until we actually use it.
     for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
-      Socket sock = null;
+      SocketAndStreams sockAndStreams = null;
       // Don't use the cache on the last attempt - it's possible that there
       // are arbitrarily many unusable sockets in the cache, but we don't
       // want to fail the read.
       if (retries < nCachedConnRetry) {
-        sock = socketCache.get(dnAddr);
+        sockAndStreams = socketCache.get(dnAddr);
       }
-      if (sock == null) {
+      Socket sock;
+      if (sockAndStreams == null) {
         fromCache = false;
 
         sock = dfsClient.socketFactory.createSocket();
@@ -895,6 +916,8 @@ public class DFSInputStream extends FSIn
             dfsClient.getRandomLocalInterfaceAddr(),
             dfsClient.getConf().socketTimeout);
         sock.setSoTimeout(dfsClient.getConf().socketTimeout);
+      } else {
+        sock = sockAndStreams.sock;
       }
 
       try {
@@ -905,12 +928,18 @@ public class DFSInputStream extends FSIn
                                        blockToken,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,
-                                       clientName);
+                                       clientName,
+                                       dfsClient.getDataEncryptionKey(),
+                                       sockAndStreams == null ? null : sockAndStreams.ioStreams);
         return reader;
       } catch (IOException ex) {
         // Our socket is no good.
         DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
-        sock.close();
+        if (sockAndStreams != null) {
+          sockAndStreams.close();
+        } else {
+          sock.close();
+        }
         err = ex;
       }
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Aug  7 16:40:03 2012
@@ -24,7 +24,9 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
@@ -56,6 +58,9 @@ import org.apache.hadoop.hdfs.protocol.N
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -867,16 +872,26 @@ public class DFSOutputStream extends FSO
       try {
         sock = createSocketForPipeline(src, 2, dfsClient);
         final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-        out = new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(sock, writeTimeout),
+        
+        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(sock);
+        if (dfsClient.shouldEncryptData()) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
+        in = new DataInputStream(unbufIn);
 
         //send the TRANSFER_BLOCK request
         new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
             targets);
+        out.flush();
 
         //ack
-        in = new DataInputStream(NetUtils.getInputStream(sock));
         BlockOpResponseProto response =
           BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
         if (SUCCESS != response.getStatus()) {
@@ -1034,77 +1049,98 @@ public class DFSOutputStream extends FSO
       // persist blocks on namenode on next flush
       persistBlocks.set(true);
 
-      boolean result = false;
-      DataOutputStream out = null;
-      try {
-        assert null == s : "Previous socket unclosed";
-        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
-        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
-
-        //
-        // Xmit header info to datanode
-        //
-        out = new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(s, writeTimeout),
-            HdfsConstants.SMALL_BUFFER_SIZE));
-        
-        assert null == blockReplyStream : "Previous blockReplyStream unclosed";
-        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
-        // send the request
-        new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
-            nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
-            nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
-
-        // receive ack for connect
-        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-            HdfsProtoUtil.vintPrefixed(blockReplyStream));
-        pipelineStatus = resp.getStatus();
-        firstBadLink = resp.getFirstBadLink();
-        
-        if (pipelineStatus != SUCCESS) {
-          if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
-            throw new InvalidBlockTokenException(
-                "Got access token error for connect ack with firstBadLink as "
-                    + firstBadLink);
-          } else {
-            throw new IOException("Bad connect ack with firstBadLink as "
-                + firstBadLink);
+      int refetchEncryptionKey = 1;
+      while (true) {
+        boolean result = false;
+        DataOutputStream out = null;
+        try {
+          assert null == s : "Previous socket unclosed";
+          assert null == blockReplyStream : "Previous blockReplyStream unclosed";
+          s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
+          long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+          
+          OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
+          InputStream unbufIn = NetUtils.getInputStream(s);
+          if (dfsClient.shouldEncryptData()) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(unbufOut,
+                    unbufIn, dfsClient.getDataEncryptionKey());
+            unbufOut = encryptedStreams.out;
+            unbufIn = encryptedStreams.in;
+          }
+          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          blockReplyStream = new DataInputStream(unbufIn);
+  
+          //
+          // Xmit header info to datanode
+          //
+  
+          // send the request
+          new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+              nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
+              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+  
+          // receive ack for connect
+          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+              HdfsProtoUtil.vintPrefixed(blockReplyStream));
+          pipelineStatus = resp.getStatus();
+          firstBadLink = resp.getFirstBadLink();
+          
+          if (pipelineStatus != SUCCESS) {
+            if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
+              throw new InvalidBlockTokenException(
+                  "Got access token error for connect ack with firstBadLink as "
+                      + firstBadLink);
+            } else {
+              throw new IOException("Bad connect ack with firstBadLink as "
+                  + firstBadLink);
+            }
           }
-        }
-        assert null == blockStream : "Previous blockStream unclosed";
-        blockStream = out;
-        result =  true; // success
-
-      } catch (IOException ie) {
-
-        DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
-
-        // find the datanode that matches
-        if (firstBadLink.length() != 0) {
-          for (int i = 0; i < nodes.length; i++) {
-            if (nodes[i].getXferAddr().equals(firstBadLink)) {
-              errorIndex = i;
-              break;
+          assert null == blockStream : "Previous blockStream unclosed";
+          blockStream = out;
+          result =  true; // success
+  
+        } catch (IOException ie) {
+          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+          if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+            DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+                + "encryption key was invalid when connecting to "
+                + nodes[0].getXferAddr() + " : " + ie);
+            // The encryption key used is invalid.
+            refetchEncryptionKey--;
+            dfsClient.clearDataEncryptionKey();
+            // Don't close the socket/exclude this node just yet. Try again with
+            // a new encryption key.
+            continue;
+          }
+  
+          // find the datanode that matches
+          if (firstBadLink.length() != 0) {
+            for (int i = 0; i < nodes.length; i++) {
+              if (nodes[i].getXferAddr().equals(firstBadLink)) {
+                errorIndex = i;
+                break;
+              }
             }
+          } else {
+            errorIndex = 0;
+          }
+          hasError = true;
+          setLastException(ie);
+          result =  false;  // error
+        } finally {
+          if (!result) {
+            IOUtils.closeSocket(s);
+            s = null;
+            IOUtils.closeStream(out);
+            out = null;
+            IOUtils.closeStream(blockReplyStream);
+            blockReplyStream = null;
           }
-        } else {
-          errorIndex = 0;
-        }
-        hasError = true;
-        setLastException(ie);
-        result =  false;  // error
-      } finally {
-        if (!result) {
-          IOUtils.closeSocket(s);
-          s = null;
-          IOUtils.closeStream(out);
-          out = null;
-          IOUtils.closeStream(blockReplyStream);
-          blockReplyStream = null;
         }
+        return result;
       }
-      return result;
     }
 
     private LocatedBlock locateFollowingBlock(long start,

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Aug  7 16:40:03 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -458,7 +459,9 @@ public class RemoteBlockReader extends F
   void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
-      RemoteBlockReader2.writeReadResult(sock, statusCode);
+      RemoteBlockReader2.writeReadResult(
+          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+          statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
@@ -484,4 +487,11 @@ public class RemoteBlockReader extends F
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
 
+  @Override
+  public IOStreamPair getStreams() {
+    // This class doesn't support encryption, which is the only thing this
+    // method is used for. See HDFS-3637.
+    return null;
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Aug  7 16:40:03 2012
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -35,12 +36,15 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -83,7 +87,9 @@ public class RemoteBlockReader2  impleme
 
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
   
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  Socket dnSock;
+  // for now just sending the status code (e.g. checksumOk) after the read.
+  private IOStreamPair ioStreams;
   private final ReadableByteChannel in;
   private DataChecksum checksum;
   
@@ -206,9 +212,9 @@ public class RemoteBlockReader2  impleme
     if (bytesNeededToFinish <= 0) {
       readTrailingEmptyPacket();
       if (verifyChecksum) {
-        sendReadResult(dnSock, Status.CHECKSUM_OK);
+        sendReadResult(Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, Status.SUCCESS);
+        sendReadResult(Status.SUCCESS);
       }
     }
   }
@@ -292,9 +298,11 @@ public class RemoteBlockReader2  impleme
 
   protected RemoteBlockReader2(String file, String bpid, long blockId,
       ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
+      IOStreamPair ioStreams) {
     // Path is used only for printing block and file information in debug
     this.dnSock = dnSock;
+    this.ioStreams = ioStreams;
     this.in = in;
     this.checksum = checksum;
     this.verifyChecksum = verifyChecksum;
@@ -369,24 +377,23 @@ public class RemoteBlockReader2  impleme
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
+  void sendReadResult(Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + dnSock;
     try {
-      writeReadResult(sock, statusCode);
+      writeReadResult(ioStreams.out, statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
+               dnSock.getInetAddress() + ": " + e.getMessage());
     }
   }
 
   /**
    * Serialize the actual read result on the wire.
    */
-  static void writeReadResult(Socket sock, Status statusCode)
+  static void writeReadResult(OutputStream out, Status statusCode)
       throws IOException {
-    OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
     
     ClientReadStatusProto.newBuilder()
       .setStatus(statusCode)
@@ -434,25 +441,32 @@ public class RemoteBlockReader2  impleme
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    */
-  public static BlockReader newBlockReader( Socket sock, String file,
+  public static BlockReader newBlockReader(Socket sock, String file,
                                      ExtendedBlock block,
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
-                                     String clientName)
+                                     String clientName,
+                                     DataEncryptionKey encryptionKey,
+                                     IOStreamPair ioStreams)
                                      throws IOException {
+    
+    ReadableByteChannel ch;
+    if (ioStreams.in instanceof SocketInputWrapper) {
+      ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+    } else {
+      ch = (ReadableByteChannel) ioStreams.in;
+    }
+    
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock,
-              HdfsServerConstants.WRITE_TIMEOUT)));
+          ioStreams.out));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
 
     //
-    // Get bytes in block, set streams
+    // Get bytes in block
     //
-    SocketInputWrapper sin = NetUtils.getInputStream(sock);
-    ReadableByteChannel ch = sin.getReadableByteChannel();
-    DataInputStream in = new DataInputStream(sin);
+    DataInputStream in = new DataInputStream(ioStreams.in);
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
@@ -474,7 +488,8 @@ public class RemoteBlockReader2  impleme
     }
 
     return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
-        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
+        ioStreams);
   }
 
   static void checkSuccess(
@@ -498,4 +513,9 @@ public class RemoteBlockReader2  impleme
       }
     }
   }
+
+  @Override
+  public IOStreamPair getStreams() {
+    return ioStreams;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Tue Aug  7 16:40:03 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.io.Closeable;
 import java.net.Socket;
 import java.net.SocketAddress;
 
@@ -29,6 +30,8 @@ import com.google.common.base.Preconditi
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.io.IOUtils;
 
 /**
@@ -37,7 +40,7 @@ import org.apache.hadoop.io.IOUtils;
 class SocketCache {
   static final Log LOG = LogFactory.getLog(SocketCache.class);
 
-  private final LinkedListMultimap<SocketAddress, Socket> multimap;
+  private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
   private final int capacity;
 
   /**
@@ -57,21 +60,21 @@ class SocketCache {
    * @param remote  Remote address the socket is connected to.
    * @return  A socket with unknown state, possibly closed underneath. Or null.
    */
-  public synchronized Socket get(SocketAddress remote) {
+  public synchronized SocketAndStreams get(SocketAddress remote) {
     if (capacity <= 0) { // disabled
       return null;
     }
     
-    List<Socket> socklist = multimap.get(remote);
+    List<SocketAndStreams> socklist = multimap.get(remote);
     if (socklist == null) {
       return null;
     }
 
-    Iterator<Socket> iter = socklist.iterator();
+    Iterator<SocketAndStreams> iter = socklist.iterator();
     while (iter.hasNext()) {
-      Socket candidate = iter.next();
+      SocketAndStreams candidate = iter.next();
       iter.remove();
-      if (!candidate.isClosed()) {
+      if (!candidate.sock.isClosed()) {
         return candidate;
       }
     }
@@ -82,10 +85,11 @@ class SocketCache {
    * Give an unused socket to the cache.
    * @param sock socket not used by anyone.
    */
-  public synchronized void put(Socket sock) {
+  public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+    SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
     if (capacity <= 0) {
       // Cache disabled.
-      IOUtils.closeSocket(sock);
+      s.close();
       return;
     }
     
@@ -102,7 +106,7 @@ class SocketCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, sock);
+    multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
   }
 
   public synchronized int size() {
@@ -113,23 +117,23 @@ class SocketCache {
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
-    Iterator<Entry<SocketAddress, Socket>> iter =
+    Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
       throw new IllegalStateException("Cannot evict from empty cache!");
     }
-    Entry<SocketAddress, Socket> entry = iter.next();
+    Entry<SocketAddress, SocketAndStreams> entry = iter.next();
     iter.remove();
-    Socket sock = entry.getValue();
-    IOUtils.closeSocket(sock);
+    SocketAndStreams s = entry.getValue();
+    s.close();
   }
 
   /**
    * Empty the cache, and close all sockets.
    */
   public synchronized void clear() {
-    for (Socket sock : multimap.values()) {
-      IOUtils.closeSocket(sock);
+    for (SocketAndStreams s : multimap.values()) {
+      s.close();
     }
     multimap.clear();
   }
@@ -138,5 +142,25 @@ class SocketCache {
   protected void finalize() {
     clear();
   }
+  
+  @InterfaceAudience.Private
+  static class SocketAndStreams implements Closeable {
+    public final Socket sock;
+    public final IOStreamPair ioStreams;
+    
+    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+      this.sock = s;
+      this.ioStreams = ioStreams;
+    }
+    
+    @Override
+    public void close() {
+      if (ioStreams != null) { 
+        IOUtils.closeStream(ioStreams.in);
+        IOUtils.closeStream(ioStreams.out);
+      }
+      IOUtils.closeSocket(sock);
+    }
+  }
 
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Aug  7 16:40:03 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 
@@ -941,4 +942,11 @@ public interface ClientProtocol {
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException;
+  
+  /**
+   * @return encryption key so a client can encrypt data sent via the
+   *         DataTransferProtocol to/from DataNodes.
+   * @throws IOException
+   */
+  public DataEncryptionKey getDataEncryptionKey() throws IOException;
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java?rev=1370354&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java Tue Aug  7 16:40:03 2012
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
+/**
+ * A class which, given connected input/output streams, will perform a
+ * handshake using those streams based on SASL to produce new Input/Output
+ * streams which will encrypt/decrypt all data written/read from said streams.
+ * Much of this is inspired by or borrowed from the TSaslTransport in Apache
+ * Thrift, but with some HDFS-specific tweaks.
+ */
+@InterfaceAudience.Private
+public class DataTransferEncryptor {
+  
+  public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class);
+  
+  /**
+   * Sent by clients and validated by servers. We use a number that's unlikely
+   * to ever be sent as the value of the DATA_TRANSFER_VERSION.
+   */
+  private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+  
+  /**
+   * Delimiter for the three-part SASL username string.
+   */
+  private static final String NAME_DELIMITER = " ";
+  
+  // This has to be set as part of the SASL spec, but it don't matter for
+  // our purposes, but may not be empty. It's sent over the wire, so use
+  // a short string.
+  private static final String SERVER_NAME = "0";
+  
+  private static final String PROTOCOL = "hdfs";
+  private static final String MECHANISM = "DIGEST-MD5";
+  private static final Map<String, String> SASL_PROPS = new TreeMap<String, String>();
+  
+  static {
+    SASL_PROPS.put(Sasl.QOP, "auth-conf");
+    SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+  }
+  
+  /**
+   * Factory method for DNs, where the nonce, keyId, and encryption key are not
+   * yet known. The nonce and keyId will be sent by the client, and the DN
+   * will then use those pieces of info and the secret key shared with the NN
+   * to determine the encryptionKey used for the SASL handshake/encryption.
+   * 
+   * Establishes a secure connection assuming that the party on the other end
+   * has the same shared secret. This does a SASL connection handshake, but not
+   * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
+   * auth-conf enabled. In particular, it doesn't support an arbitrary number of
+   * challenge/response rounds, and we know that the client will never have an
+   * initial response, so we don't check for one.
+   *
+   * @param underlyingOut output stream to write to the other party
+   * @param underlyingIn input stream to read from the other party
+   * @param blockPoolTokenSecretManager secret manager capable of constructing
+   *        encryption key based on keyId, blockPoolId, and nonce
+   * @return a pair of streams which wrap the given streams and encrypt/decrypt
+   *         all data read/written
+   * @throws IOException in the event of error
+   */
+  public static IOStreamPair getEncryptedStreams(
+      OutputStream underlyingOut, InputStream underlyingIn,
+      BlockPoolTokenSecretManager blockPoolTokenSecretManager,
+      String encryptionAlgorithm) throws IOException {
+    
+    DataInputStream in = new DataInputStream(underlyingIn);
+    DataOutputStream out = new DataOutputStream(underlyingOut);
+    
+    Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
+    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Server using encryption algorithm " + encryptionAlgorithm);
+    }
+    
+    SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM,
+        PROTOCOL, SERVER_NAME, saslProps,
+        new SaslServerCallbackHandler(blockPoolTokenSecretManager)));
+    
+    int magicNumber = in.readInt();
+    if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) {
+      throw new InvalidMagicNumberException(magicNumber);
+    }
+    try {
+      // step 1
+      performSaslStep1(out, in, sasl);
+      
+      // step 2 (server-side only)
+      byte[] remoteResponse = readSaslMessage(in);
+      byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+      sendSaslMessage(out, localResponse);
+      
+      // SASL handshake is complete
+      checkSaslComplete(sasl);
+      
+      return sasl.createEncryptedStreamPair(out, in);
+    } catch (IOException ioe) {
+      if (ioe instanceof SaslException &&
+          ioe.getCause() != null &&
+          ioe.getCause() instanceof InvalidEncryptionKeyException) {
+        // This could just be because the client is long-lived and hasn't gotten
+        // a new encryption key from the NN in a while. Upon receiving this
+        // error, the client will get a new encryption key from the NN and retry
+        // connecting to this DN.
+        sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
+      } else {
+        sendGenericSaslErrorMessage(out, ioe.getMessage());
+      }
+      throw ioe;
+    }
+  }
+  
+  /**
+   * Factory method for clients, where the encryption token is already created.
+   * 
+   * Establishes a secure connection assuming that the party on the other end
+   * has the same shared secret. This does a SASL connection handshake, but not
+   * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
+   * auth-conf enabled. In particular, it doesn't support an arbitrary number of
+   * challenge/response rounds, and we know that the client will never have an
+   * initial response, so we don't check for one.
+   *
+   * @param underlyingOut output stream to write to the other party
+   * @param underlyingIn input stream to read from the other party
+   * @param encryptionKey all info required to establish an encrypted stream
+   * @return a pair of streams which wrap the given streams and encrypt/decrypt
+   *         all data read/written
+   * @throws IOException in the event of error
+   */
+  public static IOStreamPair getEncryptedStreams(
+      OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKey encryptionKey)
+          throws IOException {
+    
+    Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
+    saslProps.put("com.sun.security.sasl.digest.cipher",
+        encryptionKey.encryptionAlgorithm);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client using encryption algorithm " +
+          encryptionKey.encryptionAlgorithm);
+    }
+    
+    DataOutputStream out = new DataOutputStream(underlyingOut);
+    DataInputStream in = new DataInputStream(underlyingIn);
+    
+    String userName = getUserNameFromEncryptionKey(encryptionKey);
+    SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient(
+        new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps,
+        new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName)));
+    
+    out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER);
+    out.flush();
+    
+    try {
+      // Start of handshake - "initial response" in SASL terminology.
+      sendSaslMessage(out, new byte[0]);
+      
+      // step 1
+      performSaslStep1(out, in, sasl);
+      
+      // step 2 (client-side only)
+      byte[] remoteResponse = readSaslMessage(in);
+      byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+      assert localResponse == null;
+      
+      // SASL handshake is complete
+      checkSaslComplete(sasl);
+      
+      return sasl.createEncryptedStreamPair(out, in);
+    } catch (IOException ioe) {
+      sendGenericSaslErrorMessage(out, ioe.getMessage());
+      throw ioe;
+    }
+  }
+  
+  private static void performSaslStep1(DataOutputStream out, DataInputStream in,
+      SaslParticipant sasl) throws IOException {
+    byte[] remoteResponse = readSaslMessage(in);
+    byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+    sendSaslMessage(out, localResponse);
+  }
+  
+  private static void checkSaslComplete(SaslParticipant sasl) throws IOException {
+    if (!sasl.isComplete()) {
+      throw new IOException("Failed to complete SASL handshake");
+    }
+    
+    if (!sasl.supportsConfidentiality()) {
+      throw new IOException("SASL handshake completed, but channel does not " +
+          "support encryption");
+    }
+  }
+  
+  private static void sendSaslMessage(DataOutputStream out, byte[] payload)
+      throws IOException {
+    sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
+  }
+  
+  private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
+      String message) throws IOException {
+    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
+        message);
+  }
+  
+  private static void sendGenericSaslErrorMessage(DataOutputStream out,
+      String message) throws IOException {
+    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
+  }
+  
+  private static void sendSaslMessage(OutputStream out,
+      DataTransferEncryptorStatus status, byte[] payload, String message)
+          throws IOException {
+    DataTransferEncryptorMessageProto.Builder builder =
+        DataTransferEncryptorMessageProto.newBuilder();
+    
+    builder.setStatus(status);
+    if (payload != null) {
+      builder.setPayload(ByteString.copyFrom(payload));
+    }
+    if (message != null) {
+      builder.setMessage(message);
+    }
+    
+    DataTransferEncryptorMessageProto proto = builder.build();
+    proto.writeDelimitedTo(out);
+    out.flush();
+  }
+  
+  private static byte[] readSaslMessage(DataInputStream in) throws IOException {
+    DataTransferEncryptorMessageProto proto =
+        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+      throw new InvalidEncryptionKeyException(proto.getMessage());
+    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+      throw new IOException(proto.getMessage());
+    } else {
+      return proto.getPayload().toByteArray();
+    }
+  }
+  
+  /**
+   * Set the encryption key when asked by the server-side SASL object.
+   */
+  private static class SaslServerCallbackHandler implements CallbackHandler {
+    
+    private BlockPoolTokenSecretManager blockPoolTokenSecretManager;
+    
+    public SaslServerCallbackHandler(BlockPoolTokenSecretManager
+        blockPoolTokenSecretManager) {
+      this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL DIGEST-MD5 Callback: " + callback);
+        }
+      }
+      
+      if (pc != null) {
+        byte[] encryptionKey = getEncryptionKeyFromUserName(
+            blockPoolTokenSecretManager, nc.getDefaultName());
+        pc.setPassword(encryptionKeyToPassword(encryptionKey));
+      }
+      
+      if (ac != null) {
+        ac.setAuthorized(true);
+        ac.setAuthorizedID(ac.getAuthorizationID());
+      }
+      
+    }
+    
+  }
+  
+  /**
+   * Set the encryption key when asked by the client-side SASL object.
+   */
+  private static class SaslClientCallbackHandler implements CallbackHandler {
+    
+    private byte[] encryptionKey;
+    private String userName;
+    
+    public SaslClientCallbackHandler(byte[] encryptionKey, String userName) {
+      this.encryptionKey = encryptionKey;
+      this.userName = userName;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        pc.setPassword(encryptionKeyToPassword(encryptionKey));
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+    
+  }
+  
+  /**
+   * The SASL username consists of the keyId, blockPoolId, and nonce with the
+   * first two encoded as Strings, and the third encoded using Base64. The
+   * fields are each separated by a single space.
+   * 
+   * @param encryptionKey the encryption key to encode as a SASL username.
+   * @return encoded username containing keyId, blockPoolId, and nonce
+   */
+  private static String getUserNameFromEncryptionKey(
+      DataEncryptionKey encryptionKey) {
+    return encryptionKey.keyId + NAME_DELIMITER +
+        encryptionKey.blockPoolId + NAME_DELIMITER +
+        new String(Base64.encodeBase64(encryptionKey.nonce, false));
+  }
+  
+  /**
+   * Given a secret manager and a username encoded as described above, determine
+   * the encryption key.
+   * 
+   * @param blockPoolTokenSecretManager to determine the encryption key.
+   * @param userName containing the keyId, blockPoolId, and nonce.
+   * @return secret encryption key.
+   * @throws IOException
+   */
+  private static byte[] getEncryptionKeyFromUserName(
+      BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName)
+      throws IOException {
+    String[] nameComponents = userName.split(NAME_DELIMITER);
+    if (nameComponents.length != 3) {
+      throw new IOException("Provided name '" + userName + "' has " +
+          nameComponents.length + " components instead of the expected 3.");
+    }
+    int keyId = Integer.parseInt(nameComponents[0]);
+    String blockPoolId = nameComponents[1];
+    byte[] nonce = Base64.decodeBase64(nameComponents[2]);
+    return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
+        blockPoolId, nonce);
+  }
+  
+  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+    return new String(Base64.encodeBase64(encryptionKey, false)).toCharArray();
+  }
+  
+  /**
+   * Strongly inspired by Thrift's TSaslTransport class.
+   * 
+   * Used to abstract over the <code>SaslServer</code> and
+   * <code>SaslClient</code> classes, which share a lot of their interface, but
+   * unfortunately don't share a common superclass.
+   */
+  private static class SaslParticipant {
+    // One of these will always be null.
+    public SaslServer saslServer;
+    public SaslClient saslClient;
+
+    public SaslParticipant(SaslServer saslServer) {
+      this.saslServer = saslServer;
+    }
+
+    public SaslParticipant(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+    
+    public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
+      if (saslClient != null) {
+        return saslClient.evaluateChallenge(challengeOrResponse);
+      } else {
+        return saslServer.evaluateResponse(challengeOrResponse);
+      }
+    }
+
+    public boolean isComplete() {
+      if (saslClient != null)
+        return saslClient.isComplete();
+      else
+        return saslServer.isComplete();
+    }
+    
+    public boolean supportsConfidentiality() {
+      String qop = null;
+      if (saslClient != null) {
+        qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+      } else {
+        qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+      }
+      return qop != null && qop.equals("auth-conf");
+    }
+    
+    // Return some input/output streams that will henceforth have their
+    // communication encrypted.
+    private IOStreamPair createEncryptedStreamPair(
+        DataOutputStream out, DataInputStream in) {
+      if (saslClient != null) {
+        return new IOStreamPair(
+            new SaslInputStream(in, saslClient),
+            new SaslOutputStream(out, saslClient));
+      } else {
+        return new IOStreamPair(
+            new SaslInputStream(in, saslServer),
+            new SaslOutputStream(out, saslServer));
+      }
+    }
+  }
+  
+  @InterfaceAudience.Private
+  public static class InvalidMagicNumberException extends IOException {
+    
+    private static final long serialVersionUID = 1L;
+
+    public InvalidMagicNumberException(int magicNumber) {
+      super(String.format("Received %x instead of %x from client.",
+          magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER));
+    }
+  }
+  
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java?rev=1370354&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java Tue Aug  7 16:40:03 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A little struct class to wrap an InputStream and an OutputStream.
+ */
+@InterfaceAudience.Private
+public class IOStreamPair {
+  public final InputStream in;
+  public final OutputStream out;
+  
+  public IOStreamPair(InputStream in, OutputStream out) {
+    this.in = in;
+    this.out = out;
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java?rev=1370354&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java Tue Aug  7 16:40:03 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Encryption key verification failed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InvalidEncryptionKeyException extends IOException {
+  private static final long serialVersionUID = 0l;
+
+  public InvalidEncryptionKeyException() {
+    super();
+  }
+
+  public InvalidEncryptionKeyException(String msg) {
+    super(msg);
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Tue Aug  7 16:40:03 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class Receiver implements DataTransferProtocol {
-  protected final DataInputStream in;
-
-  /** Create a receiver for DataTransferProtocol with a socket. */
-  protected Receiver(final DataInputStream in) {
+  protected DataInputStream in;
+  
+  /** Initialize a receiver for DataTransferProtocol with a socket. */
+  protected void initialize(final DataInputStream in) {
     this.in = in;
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Aug  7 16:40:03 2012
@@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@@ -127,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.io.Text;
 
@@ -830,4 +833,18 @@ public class ClientNamenodeProtocolServe
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
+      RpcController controller, GetDataEncryptionKeyRequestProto request)
+      throws ServiceException {
+    try {
+      DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
+      return GetDataEncryptionKeyResponseProto.newBuilder()
+          .setDataEncryptionKey(PBHelper.convert(encryptionKey))
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Tue Aug  7 16:40:03 2012
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
@@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -815,9 +817,22 @@ public class ClientNamenodeProtocolTrans
         ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
         RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
   }
+  
+  @Override
+  public DataEncryptionKey getDataEncryptionKey() throws IOException {
+    GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto
+        .newBuilder().build();
+    try {
+      return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req)
+          .getDataEncryptionKey());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
   }
+  
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Aug  7 16:40:03 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -970,12 +972,37 @@ public class PBHelper {
         .setIsLastBlockComplete(lb.isLastBlockComplete()).build();
   }
   
+  // DataEncryptionKey
+  public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
+    String encryptionAlgorithm = bet.getEncryptionAlgorithm();
+    return new DataEncryptionKey(bet.getKeyId(),
+        bet.getBlockPoolId(),
+        bet.getNonce().toByteArray(),
+        bet.getEncryptionKey().toByteArray(),
+        bet.getExpiryDate(),
+        encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
+  }
+  
+  public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
+    DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
+        .setKeyId(bet.keyId)
+        .setBlockPoolId(bet.blockPoolId)
+        .setNonce(ByteString.copyFrom(bet.nonce))
+        .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
+        .setExpiryDate(bet.expiryDate);
+    if (bet.encryptionAlgorithm != null) {
+      b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
+    }
+    return b.build();
+  }
+  
   public static FsServerDefaults convert(FsServerDefaultsProto fs) {
     if (fs == null) return null;
     return new FsServerDefaults(
         fs.getBlockSize(), fs.getBytesPerChecksum(), 
         fs.getWritePacketSize(), (short) fs.getReplication(),
-        fs.getFileBufferSize());
+        fs.getFileBufferSize(),
+        fs.getEncryptDataTransfer());
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -983,7 +1010,10 @@ public class PBHelper {
     return FsServerDefaultsProto.newBuilder().
       setBlockSize(fs.getBlockSize()).
       setBytesPerChecksum(fs.getBytesPerChecksum()).
-      setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build();
+      setWritePacketSize(fs.getWritePacketSize())
+      .setReplication(fs.getReplication())
+      .setFileBufferSize(fs.getFileBufferSize())
+      .setEncryptDataTransfer(fs.getEncryptDataTransfer()).build();
   }
   
   public static FsPermissionProto convert(FsPermission p) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1370354&r1=1370353&r2=1370354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java Tue Aug  7 16:40:03 2012
@@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager
       btsm.clearAllKeysForTesting();
     }
   }
+
+  public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) {
+    return get(blockPoolId).generateDataEncryptionKey();
+  }
+  
+  public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId,
+      byte[] nonce) throws IOException {
+    return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce);
+  }
 }