You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/08/07 18:46:11 UTC

svn commit: r1370360 [2/3] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/ma...

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Aug  7 16:46:03 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -970,12 +972,37 @@ public class PBHelper {
         .setIsLastBlockComplete(lb.isLastBlockComplete()).build();
   }
   
+  // DataEncryptionKey
+  public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
+    String encryptionAlgorithm = bet.getEncryptionAlgorithm();
+    return new DataEncryptionKey(bet.getKeyId(),
+        bet.getBlockPoolId(),
+        bet.getNonce().toByteArray(),
+        bet.getEncryptionKey().toByteArray(),
+        bet.getExpiryDate(),
+        encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
+  }
+  
+  public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
+    DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
+        .setKeyId(bet.keyId)
+        .setBlockPoolId(bet.blockPoolId)
+        .setNonce(ByteString.copyFrom(bet.nonce))
+        .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
+        .setExpiryDate(bet.expiryDate);
+    if (bet.encryptionAlgorithm != null) {
+      b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
+    }
+    return b.build();
+  }
+  
   public static FsServerDefaults convert(FsServerDefaultsProto fs) {
     if (fs == null) return null;
     return new FsServerDefaults(
         fs.getBlockSize(), fs.getBytesPerChecksum(), 
         fs.getWritePacketSize(), (short) fs.getReplication(),
-        fs.getFileBufferSize());
+        fs.getFileBufferSize(),
+        fs.getEncryptDataTransfer());
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -983,7 +1010,10 @@ public class PBHelper {
     return FsServerDefaultsProto.newBuilder().
       setBlockSize(fs.getBlockSize()).
       setBytesPerChecksum(fs.getBytesPerChecksum()).
-      setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build();
+      setWritePacketSize(fs.getWritePacketSize())
+      .setReplication(fs.getReplication())
+      .setFileBufferSize(fs.getFileBufferSize())
+      .setEncryptDataTransfer(fs.getEncryptDataTransfer()).build();
   }
   
   public static FsPermissionProto convert(FsPermission p) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java Tue Aug  7 16:46:03 2012
@@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager
       btsm.clearAllKeysForTesting();
     }
   }
+
+  public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) {
+    return get(blockPoolId).generateDataEncryptionKey();
+  }
+  
+  public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId,
+      byte[] nonce) throws IOException {
+    return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce);
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Tue Aug  7 16:46:03 2012
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -74,6 +75,10 @@ public class BlockTokenSecretManager ext
   private BlockKey currentKey;
   private BlockKey nextKey;
   private Map<Integer, BlockKey> allKeys;
+  private String blockPoolId;
+  private String encryptionAlgorithm;
+  
+  private SecureRandom nonceGenerator = new SecureRandom();
 
   public static enum AccessMode {
     READ, WRITE, COPY, REPLACE
@@ -86,8 +91,9 @@ public class BlockTokenSecretManager ext
    * @param tokenLifetime how long an individual token is valid
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime) {
-    this(false, keyUpdateInterval, tokenLifetime);
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+    this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm);
   }
   
   /**
@@ -100,8 +106,10 @@ public class BlockTokenSecretManager ext
    * @param otherNnId the NN ID of the other NN in an HA setup
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime, int nnIndex) {
-    this(true, keyUpdateInterval, tokenLifetime);
+      long tokenLifetime, int nnIndex, String blockPoolId,
+      String encryptionAlgorithm) {
+    this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm);
     Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
     this.nnIndex = nnIndex;
     setSerialNo(new SecureRandom().nextInt());
@@ -109,17 +117,24 @@ public class BlockTokenSecretManager ext
   }
   
   private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
     this.isMaster = isMaster;
     this.keyUpdateInterval = keyUpdateInterval;
     this.tokenLifetime = tokenLifetime;
     this.allKeys = new HashMap<Integer, BlockKey>();
+    this.blockPoolId = blockPoolId;
+    this.encryptionAlgorithm = encryptionAlgorithm;
+    generateKeys();
   }
   
   @VisibleForTesting
   public synchronized void setSerialNo(int serialNo) {
     this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
   }
+  
+  public void setBlockPoolId(String blockPoolId) {
+    this.blockPoolId = blockPoolId;
+  }
 
   /** Initialize block keys */
   private synchronized void generateKeys() {
@@ -371,6 +386,49 @@ public class BlockTokenSecretManager ext
     return createPassword(identifier.getBytes(), key.getKey());
   }
   
+  /**
+   * Generate a data encryption key for this block pool, using the current
+   * BlockKey.
+   * 
+   * @return a data encryption key which may be used to encrypt traffic
+   *         over the DataTransferProtocol
+   */
+  public DataEncryptionKey generateDataEncryptionKey() {
+    byte[] nonce = new byte[8];
+    nonceGenerator.nextBytes(nonce);
+    BlockKey key = null;
+    synchronized (this) {
+      key = currentKey;
+    }
+    byte[] encryptionKey = createPassword(nonce, key.getKey());
+    return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce,
+        encryptionKey, Time.now() + tokenLifetime,
+        encryptionAlgorithm);
+  }
+  
+  /**
+   * Recreate an encryption key based on the given key id and nonce.
+   * 
+   * @param keyId identifier of the secret key used to generate the encryption key.
+   * @param nonce random value used to create the encryption key
+   * @return the encryption key which corresponds to this (keyId, blockPoolId, nonce)
+   * @throws InvalidToken
+   * @throws InvalidEncryptionKeyException 
+   */
+  public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce)
+      throws InvalidEncryptionKeyException {
+    BlockKey key = null;
+    synchronized (this) {
+      key = allKeys.get(keyId);
+      if (key == null) {
+        throw new InvalidEncryptionKeyException("Can't re-compute encryption key"
+            + " for nonce, since the required block key (keyID=" + keyId
+            + ") doesn't exist. Current key: " + currentKey.getKeyId());
+      }
+    }
+    return createPassword(nonce, key.getKey());
+  }
+  
   @VisibleForTesting
   public synchronized void setKeyUpdateIntervalForTesting(long millis) {
     this.keyUpdateInterval = millis;

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java?rev=1370360&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java Tue Aug  7 16:46:03 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.security.token.block;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A little struct class to contain all fields required to perform encryption of
+ * the DataTransferProtocol.
+ */
+@InterfaceAudience.Private
+public class DataEncryptionKey {
+  public final int keyId;
+  public final String blockPoolId;
+  public final byte[] nonce;
+  public final byte[] encryptionKey;
+  public final long expiryDate;
+  public final String encryptionAlgorithm;
+  
+  public DataEncryptionKey(int keyId, String blockPoolId, byte[] nonce,
+      byte[] encryptionKey, long expiryDate, String encryptionAlgorithm) {
+    this.keyId = keyId;
+    this.blockPoolId = blockPoolId;
+    this.nonce = nonce;
+    this.encryptionKey = encryptionKey;
+    this.expiryDate = expiryDate;
+    this.encryptionAlgorithm = encryptionAlgorithm;
+  }
+  
+  @Override
+  public String toString() {
+    return keyId + "/" + blockPoolId + "/" + nonce.length + "/" +
+        encryptionKey.length;
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Aug  7 16:46:03 2012
@@ -24,6 +24,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.net.URI;
 import java.text.DateFormat;
@@ -57,6 +59,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -311,11 +315,22 @@ public class Balancer {
             NetUtils.createSocketAddr(target.datanode.getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
         sock.setKeepAlive(true);
-        out = new DataOutputStream( new BufferedOutputStream(
-            sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
+        OutputStream unbufOut = sock.getOutputStream();
+        InputStream unbufIn = sock.getInputStream();
+        if (nnc.getDataEncryptionKey() != null) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn, nnc.getDataEncryptionKey());
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        in = new DataInputStream(new BufferedInputStream(unbufIn,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
         sendRequest(out);
-        in = new DataInputStream( new BufferedInputStream(
-            sock.getInputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
         LOG.info( "Moving block " + block.getBlock().getBlockId() +

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Tue Aug  7 16:46:03 2012
@@ -29,10 +29,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -60,10 +62,12 @@ class NameNodeConnector {
   final OutputStream out;
 
   private final boolean isBlockTokenEnabled;
+  private final boolean encryptDataTransfer;
   private boolean shouldRun;
   private long keyUpdaterInterval;
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
+  private DataEncryptionKey encryptionKey;
 
   NameNodeConnector(URI nameNodeUri,
       Configuration conf) throws IOException {
@@ -88,8 +92,11 @@ class NameNodeConnector {
       LOG.info("Block token params received from NN: keyUpdateInterval="
           + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
           + blockTokenLifetime / (60 * 1000) + " min(s)");
+      String encryptionAlgorithm = conf.get(
+          DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
       this.blockTokenSecretManager = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime);
+          blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
+          encryptionAlgorithm);
       this.blockTokenSecretManager.addKeys(keys);
       /*
        * Balancer should sync its block keys with NN more frequently than NN
@@ -102,7 +109,8 @@ class NameNodeConnector {
       this.shouldRun = true;
       this.keyupdaterthread.start();
     }
-
+    this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
+        .getEncryptDataTransfer();
     // Check if there is another balancer running.
     // Exit if there is another one running.
     out = checkAndMarkRunningBalancer(); 
@@ -126,6 +134,20 @@ class NameNodeConnector {
           BlockTokenSecretManager.AccessMode.COPY));
     }
   }
+  
+  DataEncryptionKey getDataEncryptionKey()
+      throws IOException {
+    if (encryptDataTransfer) {
+      synchronized (this) {
+        if (encryptionKey == null) {
+          encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
+        }
+        return encryptionKey;
+      }
+    } else {
+      return null;
+    }
+  }
 
   /* The idea for making sure that there is no more than one balancer
    * running in an HDFS is to create a file in the HDFS, writes the IP address
@@ -208,4 +230,4 @@ class NameNodeConnector {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug  7 16:46:03 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -206,6 +207,9 @@ public class BlockManager {
 
   /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
+  
+  // whether or not to issue block encryption keys.
+  final boolean encryptDataTransfer;
 
   /**
    * When running inside a Standby node, the node may receive block reports
@@ -286,12 +290,18 @@ public class BlockManager {
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                   DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+    
+    this.encryptDataTransfer =
+        conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
+            DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
     LOG.info("maxReplicationStreams      = " + maxReplicationStreams);
     LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
     LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
+    LOG.info("encryptDataTransfer        = " + encryptDataTransfer);
   }
 
   private static BlockTokenSecretManager createBlockTokenSecretManager(
@@ -311,10 +321,14 @@ public class BlockManager {
     final long lifetimeMin = conf.getLong(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT);
+    final String encryptionAlgorithm = conf.get(
+        DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY
         + "=" + updateMin + " min(s), "
         + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
-        + "=" + lifetimeMin + " min(s)");
+        + "=" + lifetimeMin + " min(s), "
+        + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY
+        + "=" + encryptionAlgorithm);
     
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
@@ -323,10 +337,17 @@ public class BlockManager {
       String thisNnId = HAUtil.getNameNodeId(conf, nsId);
       String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1);
+          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
+          encryptionAlgorithm);
     } else {
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, 0);
+          lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
+    }
+  }
+  
+  public void setBlockPoolId(String blockPoolId) {
+    if (isBlockTokenEnabled()) {
+      blockTokenSecretManager.setBlockPoolId(blockPoolId);
     }
   }
 
@@ -793,6 +814,14 @@ public class BlockManager {
       nodeinfo.needKeyUpdate = false;
     }
   }
+  
+  public DataEncryptionKey generateDataEncryptionKey() {
+    if (isBlockTokenEnabled() && encryptDataTransfer) {
+      return blockTokenSecretManager.generateDataEncryptionKey();
+    } else {
+      return null;
+    }
+  }
 
   /**
    * Clamp the specified replication between the minimum and the maximum

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Aug  7 16:46:03 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -195,7 +196,8 @@ public class JspHelper {
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, Configuration conf) throws IOException {
+      JspWriter out, Configuration conf, DataEncryptionKey encryptionKey)
+          throws IOException {
     if (chunkSizeToView == 0) return;
     Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
@@ -208,7 +210,7 @@ public class JspHelper {
     BlockReader blockReader = BlockReaderFactory.newBlockReader(
         conf, s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead);
+        offsetIntoBlock, amtToRead, encryptionKey);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Tue Aug  7 16:46:03 2012
@@ -33,7 +33,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT;
-
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -52,6 +54,7 @@ class DNConf {
   final boolean syncBehindWrites;
   final boolean dropCacheBehindReads;
   final boolean syncOnClose;
+  final boolean encryptDataTransfer;
   
 
   final long readaheadLength;
@@ -62,6 +65,7 @@ class DNConf {
   final int writePacketSize;
   
   final String minimumNameNodeVersion;
+  final String encryptionAlgorithm;
 
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -117,6 +121,10 @@ class DNConf {
 
     this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY,
         DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT);
+    
+    this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
+        DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Aug  7 16:46:03 2012
@@ -53,6 +53,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -100,6 +101,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
@@ -737,8 +740,6 @@ public class DataNode extends Configured
             + " tokens, or none may be.");
       }
     }
-    // TODO should we check that all federated nns are either enabled or
-    // disabled?
     if (!isBlockTokenEnabled) return;
     
     if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
@@ -750,7 +751,8 @@ public class DataNode extends Configured
           + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
           + " min(s)");
       final BlockTokenSecretManager secretMgr = 
-        new BlockTokenSecretManager(0, blockTokenLifetime);
+          new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
+              dnConf.encryptionAlgorithm);
       blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
     }
   }
@@ -1390,9 +1392,21 @@ public class DataNode extends Configured
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
-        OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
-        out = new DataOutputStream(new BufferedOutputStream(baseStream,
+        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(sock);
+        if (dnConf.encryptDataTransfer) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn,
+                  blockPoolTokenSecretManager.generateDataEncryptionKey(
+                      b.getBlockPoolId()));
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
+        in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, DataNode.this, null);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
@@ -1410,7 +1424,7 @@ public class DataNode extends Configured
             stage, 0, 0, 0, 0, blockSender.getChecksum());
 
         // send data & checksum
-        blockSender.sendBlock(out, baseStream, null);
+        blockSender.sendBlock(out, unbufOut, null);
 
         // no response necessary
         LOG.info(getClass().getSimpleName() + ": Transmitted " + b
@@ -1418,7 +1432,6 @@ public class DataNode extends Configured
 
         // read ack
         if (isClient) {
-          in = new DataInputStream(NetUtils.getInputStream(sock));
           DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
               HdfsProtoUtil.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Aug  7 16:46:03 2012
@@ -29,6 +29,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
@@ -43,7 +44,10 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -84,7 +88,8 @@ class DataXceiver extends Receiver imple
   private final DataXceiverServer dataXceiverServer;
 
   private long opStartTime; //the start time of receiving an Op
-  private final SocketInputWrapper socketInputWrapper;
+  private final SocketInputWrapper socketIn;
+  private OutputStream socketOut;
 
   /**
    * Client Name used in previous operation. Not available on first request
@@ -94,23 +99,19 @@ class DataXceiver extends Receiver imple
   
   public static DataXceiver create(Socket s, DataNode dn,
       DataXceiverServer dataXceiverServer) throws IOException {
-    
-    SocketInputWrapper iw = NetUtils.getInputStream(s);
-    return new DataXceiver(s, iw, dn, dataXceiverServer);
+    return new DataXceiver(s, dn, dataXceiverServer);
   }
   
   private DataXceiver(Socket s, 
-      SocketInputWrapper socketInput,
       DataNode datanode, 
       DataXceiverServer dataXceiverServer) throws IOException {
-    super(new DataInputStream(new BufferedInputStream(
-        socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
 
     this.s = s;
-    this.socketInputWrapper = socketInput;
+    this.dnConf = datanode.getDnConf();
+    this.socketIn = NetUtils.getInputStream(s);
+    this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
-    this.dnConf = datanode.getDnConf();
     this.dataXceiverServer = dataXceiverServer;
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
@@ -141,6 +142,10 @@ class DataXceiver extends Receiver imple
 
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
+  
+  private OutputStream getOutputStream() throws IOException {
+    return socketOut;
+  }
 
   /**
    * Read/write data from/to the DataXceiverServer.
@@ -149,8 +154,31 @@ class DataXceiver extends Receiver imple
   public void run() {
     int opsProcessed = 0;
     Op op = null;
+    
     dataXceiverServer.childSockets.add(s);
+    
     try {
+      
+      InputStream input = socketIn;
+      if (dnConf.encryptDataTransfer) {
+        IOStreamPair encryptedStreams = null;
+        try {
+          encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
+              socketIn, datanode.blockPoolTokenSecretManager,
+              dnConf.encryptionAlgorithm);
+        } catch (InvalidMagicNumberException imne) {
+          LOG.info("Failed to read expected encryption handshake from client " +
+              "at " + s.getInetAddress() + ". Perhaps the client is running an " +
+              "older version of Hadoop which does not support encryption.");
+          return;
+        }
+        input = encryptedStreams.in;
+        socketOut = encryptedStreams.out;
+      }
+      input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+      
+      super.initialize(new DataInputStream(input));
+      
       // We process requests in a loop, and stay around for a short timeout.
       // This optimistic behaviour allows the other end to reuse connections.
       // Setting keepalive timeout to 0 disable this behavior.
@@ -160,9 +188,9 @@ class DataXceiver extends Receiver imple
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
+            socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
           } else {
-            socketInputWrapper.setTimeout(dnConf.socketTimeout);
+            socketIn.setTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -215,8 +243,7 @@ class DataXceiver extends Receiver imple
       final long length) throws IOException {
     previousOpClientName = clientName;
 
-    OutputStream baseStream = NetUtils.getOutputStream(s, 
-        dnConf.socketWriteTimeout);
+    OutputStream baseStream = getOutputStream();
     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
     checkAccess(out, true, block, blockToken,
@@ -242,13 +269,12 @@ class DataXceiver extends Receiver imple
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
         LOG.info(msg);
-        sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+        sendResponse(ERROR, msg);
         throw e;
       }
       
       // send op status
-      writeSuccessWithChecksumInfo(blockSender,
-          getStreamWithTimeout(s, dnConf.socketWriteTimeout));
+      writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
 
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
@@ -347,7 +373,7 @@ class DataXceiver extends Receiver imple
     // reply to upstream datanode or client 
     final DataOutputStream replyOut = new DataOutputStream(
         new BufferedOutputStream(
-            NetUtils.getOutputStream(s, dnConf.socketWriteTimeout),
+            getOutputStream(),
             HdfsConstants.SMALL_BUFFER_SIZE));
     checkAccess(replyOut, isClient, block, blockToken,
         Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
@@ -389,11 +415,23 @@ class DataXceiver extends Receiver imple
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
           mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-          mirrorOut = new DataOutputStream(
-             new BufferedOutputStream(
-                         NetUtils.getOutputStream(mirrorSock, writeTimeout),
-                         HdfsConstants.SMALL_BUFFER_SIZE));
-          mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
+          
+          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
+              writeTimeout);
+          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
+          if (dnConf.encryptDataTransfer) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(
+                    unbufMirrorOut, unbufMirrorIn,
+                    datanode.blockPoolTokenSecretManager
+                        .generateDataEncryptionKey(block.getBlockPoolId()));
+            
+            unbufMirrorOut = encryptedStreams.out;
+            unbufMirrorIn = encryptedStreams.in;
+          }
+          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          mirrorIn = new DataInputStream(unbufMirrorIn);
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
@@ -520,7 +558,7 @@ class DataXceiver extends Receiver imple
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+        getOutputStream());
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
       writeResponse(Status.SUCCESS, null, out);
@@ -533,7 +571,7 @@ class DataXceiver extends Receiver imple
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+        getOutputStream());
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
     updateCurrentThreadName("Reading metadata for block " + block);
@@ -593,7 +631,7 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_COPY_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout);
+        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
         return;
       }
 
@@ -603,7 +641,7 @@ class DataXceiver extends Receiver imple
       String msg = "Not able to copy block " + block.getBlockId() + " to " 
       + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.info(msg);
-      sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+      sendResponse(ERROR, msg);
       return;
     }
 
@@ -617,8 +655,7 @@ class DataXceiver extends Receiver imple
           null);
 
       // set up response stream
-      OutputStream baseStream = NetUtils.getOutputStream(
-          s, dnConf.socketWriteTimeout);
+      OutputStream baseStream = getOutputStream();
       reply = new DataOutputStream(new BufferedOutputStream(
           baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
 
@@ -670,8 +707,7 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_REPLACE_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
-            dnConf.socketWriteTimeout);
+        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
         return;
       }
     }
@@ -680,7 +716,7 @@ class DataXceiver extends Receiver imple
       String msg = "Not able to receive block " + block.getBlockId() + " from " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.warn(msg);
-      sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+      sendResponse(ERROR, msg);
       return;
     }
 
@@ -699,17 +735,29 @@ class DataXceiver extends Receiver imple
       NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
       proxySock.setSoTimeout(dnConf.socketTimeout);
 
-      OutputStream baseStream = NetUtils.getOutputStream(proxySock, 
+      OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
           dnConf.socketWriteTimeout);
-      proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
+      InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
+      if (dnConf.encryptDataTransfer) {
+        IOStreamPair encryptedStreams =
+            DataTransferEncryptor.getEncryptedStreams(
+                unbufProxyOut, unbufProxyIn,
+                datanode.blockPoolTokenSecretManager
+                    .generateDataEncryptionKey(block.getBlockPoolId()));
+        unbufProxyOut = encryptedStreams.out;
+        unbufProxyIn = encryptedStreams.in;
+      }
+      
+      proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
           HdfsConstants.SMALL_BUFFER_SIZE));
+      proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
+          HdfsConstants.IO_FILE_BUFFER_SIZE));
 
       /* send request to the proxy */
       new Sender(proxyOut).copyBlock(block, blockToken);
 
       // receive the response from the proxy
-      proxyReply = new DataInputStream(new BufferedInputStream(
-          NetUtils.getInputStream(proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE));
+      
       BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
           HdfsProtoUtil.vintPrefixed(proxyReply));
 
@@ -762,7 +810,7 @@ class DataXceiver extends Receiver imple
       
       // send response back
       try {
-        sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout);
+        sendResponse(opStatus, errMsg);
       } catch (IOException ioe) {
         LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
       }
@@ -781,20 +829,13 @@ class DataXceiver extends Receiver imple
 
   /**
    * Utility function for sending a response.
-   * @param s socket to write to
+   * 
    * @param opStatus status message to write
-   * @param timeout send timeout
-   **/
-  private static void sendResponse(Socket s, Status status, String message,
-      long timeout) throws IOException {
-    DataOutputStream reply = getStreamWithTimeout(s, timeout);
-    
-    writeResponse(status, message, reply);
-  }
-  
-  private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
-      throws IOException {
-    return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+   * @param message message to send to the client or other DN
+   */
+  private void sendResponse(Status status,
+      String message) throws IOException {
+    writeResponse(status, message, getOutputStream());
   }
 
   private static void writeResponse(Status status, String message, OutputStream out)

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Tue Aug  7 16:46:03 2012
@@ -606,7 +606,7 @@ public class DatanodeJspHelper {
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
           datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
-          startOffset, chunkSizeToView, out, conf);
+          startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
     } catch (Exception e) {
       out.print(e);
     }
@@ -699,7 +699,7 @@ public class DatanodeJspHelper {
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
-        blockSize, startOffset, chunkSizeToView, out, conf);
+        blockSize, startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
     out.print("</textarea>");
     dfs.close();
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug  7 16:46:03 2012
@@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -461,7 +463,8 @@ public class FSNamesystem implements Nam
           conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
           conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
           (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
-          conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
+          conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+          conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT));
       
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
@@ -2016,6 +2019,7 @@ public class FSNamesystem implements Nam
   
   void setBlockPoolId(String bpid) {
     blockPoolId = bpid;
+    blockManager.setBlockPoolId(blockPoolId);
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Tue Aug  7 16:46:03 2012
@@ -127,7 +127,7 @@ public class FileChecksumServlets {
             datanode, conf, getUGI(request, conf));
         final ClientProtocol nnproxy = dfs.getNamenode();
         final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
-            path, nnproxy, socketFactory, socketTimeout);
+            path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey());
         MD5MD5CRC32FileChecksum.write(xml, checksum);
       } catch(IOException ioe) {
         writeXml(ioe, path, xml);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Aug  7 16:46:03 2012
@@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@@ -1048,4 +1049,9 @@ class NameNodeRpcServer implements Namen
     }
     return clientMachine;
   }
+
+  @Override
+  public DataEncryptionKey getDataEncryptionKey() throws IOException {
+    return namesystem.getBlockManager().generateDataEncryptionKey();
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Aug  7 16:46:03 2012
@@ -560,7 +560,8 @@ public class NamenodeFsck {
             block.getBlockId());
         blockReader = BlockReaderFactory.newBlockReader(
             conf, s, file, block, lblock
-            .getBlockToken(), 0, -1);
+            .getBlockToken(), 0, -1,
+            namenode.getRpcServer().getDataEncryptionKey());
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Tue Aug  7 16:46:03 2012
@@ -441,6 +441,12 @@ message SetBalancerBandwidthRequestProto
 message SetBalancerBandwidthResponseProto { // void response
 }
 
+message GetDataEncryptionKeyRequestProto { // no parameters
+}
+
+message GetDataEncryptionKeyResponseProto {
+  required DataEncryptionKeyProto dataEncryptionKey = 1;
+}
 
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
@@ -511,6 +517,8 @@ service ClientNamenodeProtocol {
       returns(RenewDelegationTokenResponseProto);
   rpc cancelDelegationToken(CancelDelegationTokenRequestProto)
       returns(CancelDelegationTokenResponseProto);
-  rpc  setBalancerBandwidth(SetBalancerBandwidthRequestProto)
+  rpc setBalancerBandwidth(SetBalancerBandwidthRequestProto)
       returns(SetBalancerBandwidthResponseProto);
+  rpc getDataEncryptionKey(GetDataEncryptionKeyRequestProto)
+      returns(GetDataEncryptionKeyResponseProto);
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Tue Aug  7 16:46:03 2012
@@ -25,6 +25,17 @@ option java_generate_equals_and_hash = t
 
 import "hdfs.proto";
 
+message DataTransferEncryptorMessageProto {
+  enum DataTransferEncryptorStatus {
+    SUCCESS = 0;
+    ERROR_UNKNOWN_KEY = 1;
+    ERROR = 2;
+  }
+  required DataTransferEncryptorStatus status = 1;
+  optional bytes payload = 2;
+  optional string message = 3;
+}
+
 message BaseHeaderProto {
   required ExtendedBlockProto block = 1;
   optional BlockTokenIdentifierProto token = 2;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Tue Aug  7 16:46:03 2012
@@ -126,7 +126,16 @@ message LocatedBlockProto {
                                         // their locations are not part of this object
 
   required BlockTokenIdentifierProto blockToken = 5;
- }
+}
+
+message DataEncryptionKeyProto {
+  required uint32 keyId = 1;
+  required string blockPoolId = 2;
+  required bytes nonce = 3;
+  required bytes encryptionKey = 4;
+  required uint64 expiryDate = 5;
+  optional string encryptionAlgorithm = 6;
+}
 
 
 /**
@@ -178,6 +187,7 @@ message FsServerDefaultsProto {
   required uint32 writePacketSize = 3;
   required uint32 replication = 4; // Actually a short - only 16 bits used
   required uint32 fileBufferSize = 5;
+  optional bool encryptDataTransfer = 6 [default = false];
 }
 
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Tue Aug  7 16:46:03 2012
@@ -1014,4 +1014,25 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.encrypt.data.transfer</name>
+  <value>false</value>
+  <description>
+    Whether or not actual block data that is read/written from/to HDFS should
+    be encrypted on the wire. This only needs to be set on the NN and DNs,
+    clients will deduce this automatically.
+  </description>
+</property>
+
+<property>
+  <name>dfs.encrypt.data.transfer.algorithm</name>
+  <value></value>
+  <description>
+    This value may be set to either "3des" or "rc4". If nothing is set, then
+    the configured JCE default on the system is used (usually 3DES.) It is
+    widely believed that 3DES is more cryptographically secure, but RC4 is
+    substantially faster.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Tue Aug  7 16:46:03 2012
@@ -155,7 +155,7 @@ public class BlockReaderTestUtil {
       testBlock.getBlockToken(), 
       offset, lenToRead,
       conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
-      true, "");
+      true, "", null, null);
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Tue Aug  7 16:46:03 2012
@@ -60,7 +60,7 @@ public class TestClientBlockVerification
     RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
-    verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
+    verify(reader).sendReadResult(Status.CHECKSUM_OK);
     reader.close();
   }
 
@@ -75,7 +75,7 @@ public class TestClientBlockVerification
 
     // We asked the blockreader for the whole file, and only read
     // half of it, so no CHECKSUM_OK
-    verify(reader, never()).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
+    verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
     reader.close();
   }
 
@@ -91,7 +91,7 @@ public class TestClientBlockVerification
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
-    verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
+    verify(reader).sendReadResult(Status.CHECKSUM_OK);
     reader.close();
   }
 
@@ -110,7 +110,7 @@ public class TestClientBlockVerification
         RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
-        verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
+        verify(reader).sendReadResult(Status.CHECKSUM_OK);
         reader.close();
       }
     }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Tue Aug  7 16:46:03 2012
@@ -168,13 +168,13 @@ public class TestConnCache {
 
     // Insert a socket to the NN
     Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
-    cache.put(nnSock);
-    assertSame("Read the write", nnSock, cache.get(nnAddr));
-    cache.put(nnSock);
+    cache.put(nnSock, null);
+    assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
+    cache.put(nnSock, null);
 
     // Insert DN socks
     for (Socket dnSock : dnSockets) {
-      cache.put(dnSock);
+      cache.put(dnSock, null);
     }
 
     assertEquals("NN socket evicted", null, cache.get(nnAddr));
@@ -182,7 +182,7 @@ public class TestConnCache {
 
     // Lookup the DN socks
     for (Socket dnSock : dnSockets) {
-      assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr));
+      assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
       dnSock.close();
     }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Tue Aug  7 16:46:03 2012
@@ -113,7 +113,7 @@ public class TestDataTransferKeepalive {
     
     // Take it out of the cache - reading should
     // give an EOF.
-    Socket s = dfsClient.socketCache.get(dnAddr);
+    Socket s = dfsClient.socketCache.get(dnAddr).sock;
     assertNotNull(s);
     assertEquals(-1, NetUtils.getInputStream(s).read());
   }

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java?rev=1370360&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java Tue Aug  7 16:46:03 2012
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestEncryptedTransfer {
+  
+  private static final Log LOG = LogFactory.getLog(TestEncryptedTransfer.class);
+  
+  private static final String PLAIN_TEXT = "this is very secret plain text";
+  private static final Path TEST_PATH = new Path("/non-encrypted-file");
+  
+  private static void setEncryptionConfigKeys(Configuration conf) {
+    conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+  }
+  
+  // Unset DFS_ENCRYPT_DATA_TRANSFER_KEY and DFS_DATA_ENCRYPTION_ALGORITHM_KEY
+  // on the client side to ensure that clients will detect this setting
+  // automatically from the NN.
+  private static FileSystem getFileSystem(Configuration conf) throws IOException {
+    Configuration localConf = new Configuration(conf);
+    localConf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
+    localConf.unset(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+    return FileSystem.get(localConf);
+  }
+
+  @Test
+  public void testEncryptedRead() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+      fs.close();
+      cluster.shutdown();
+      
+      setEncryptionConfigKeys(conf);
+      
+      cluster = new MiniDFSCluster.Builder(conf)
+          .manageDataDfsDirs(false)
+          .manageNameDfsDirs(false)
+          .format(false)
+          .startupOption(StartupOption.REGULAR)
+          .build();
+      
+      fs = getFileSystem(conf);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testEncryptedReadWithRC4() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+      fs.close();
+      cluster.shutdown();
+      
+      setEncryptionConfigKeys(conf);
+      // It'll use 3DES by default, but we set it to rc4 here.
+      conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4");
+      
+      cluster = new MiniDFSCluster.Builder(conf)
+          .manageDataDfsDirs(false)
+          .manageNameDfsDirs(false)
+          .format(false)
+          .startupOption(StartupOption.REGULAR)
+          .build();
+      
+      fs = getFileSystem(conf);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testEncryptedReadAfterNameNodeRestart() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+      fs.close();
+      cluster.shutdown();
+      
+      setEncryptionConfigKeys(conf);
+      
+      cluster = new MiniDFSCluster.Builder(conf)
+          .manageDataDfsDirs(false)
+          .manageNameDfsDirs(false)
+          .format(false)
+          .startupOption(StartupOption.REGULAR)
+          .build();
+      
+      fs = getFileSystem(conf);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      fs.close();
+      
+      cluster.restartNameNode();
+      fs = getFileSystem(conf);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testClientThatDoesNotSupportEncryption() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      fs.close();
+      cluster.shutdown();
+      
+      setEncryptionConfigKeys(conf);
+      
+      cluster = new MiniDFSCluster.Builder(conf)
+          .manageDataDfsDirs(false)
+          .manageNameDfsDirs(false)
+          .format(false)
+          .startupOption(StartupOption.REGULAR)
+          .build();
+      
+      
+      fs = getFileSystem(conf);
+      DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
+      DFSClient spyClient = Mockito.spy(client);
+      Mockito.doReturn(false).when(spyClient).shouldEncryptData();
+      DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+      
+      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+          LogFactory.getLog(DataNode.class));
+      try {
+        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+        fail("Should not have been able to read without encryption enabled.");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains("Could not obtain block:",
+            ioe);
+      } finally {
+        logs.stopCapturing();
+      }
+      fs.close();
+      
+      GenericTestUtils.assertMatches(logs.getOutput(),
+          "Failed to read expected encryption handshake from client at");
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testLongLivedReadClientAfterRestart() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+      fs.close();
+      cluster.shutdown();
+      
+      setEncryptionConfigKeys(conf);
+      
+      cluster = new MiniDFSCluster.Builder(conf)
+          .manageDataDfsDirs(false)
+          .manageNameDfsDirs(false)
+          .format(false)
+          .startupOption(StartupOption.REGULAR)
+          .build();
+      
+      fs = getFileSystem(conf);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      
+      // Restart the NN and DN, after which the client's encryption key will no
+      // longer be valid.
+      cluster.restartNameNode();
+      assertTrue(cluster.restartDataNode(0));
+      
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testLongLivedWriteClientAfterRestart() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      setEncryptionConfigKeys(conf);
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      
+      // Restart the NN and DN, after which the client's encryption key will no
+      // longer be valid.
+      cluster.restartNameNode();
+      assertTrue(cluster.restartDataNodes());
+      cluster.waitActive();
+      
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testLongLivedClient() throws IOException, InterruptedException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+      fs.close();
+      cluster.shutdown();
+      
+      setEncryptionConfigKeys(conf);
+      
+      cluster = new MiniDFSCluster.Builder(conf)
+          .manageDataDfsDirs(false)
+          .manageNameDfsDirs(false)
+          .format(false)
+          .startupOption(StartupOption.REGULAR)
+          .build();
+      
+      BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+          .getBlockTokenSecretManager();
+      btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+      btsm.setTokenLifetime(2 * 1000);
+      btsm.clearAllKeysForTesting();
+      
+      fs = getFileSystem(conf);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      
+      // Sleep for 15 seconds, after which the encryption key will no longer be
+      // valid. It needs to be a few multiples of the block token lifetime,
+      // since several block tokens are valid at any given time (the current
+      // and the last two, by default.)
+      LOG.info("Sleeping so that encryption keys expire...");
+      Thread.sleep(15 * 1000);
+      LOG.info("Done sleeping.");
+      
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testEncryptedWriteWithOneDn() throws IOException {
+    testEncryptedWrite(1);
+  }
+  
+  @Test
+  public void testEncryptedWriteWithTwoDns() throws IOException {
+    testEncryptedWrite(2);
+  }
+  
+  @Test
+  public void testEncryptedWriteWithMultipleDns() throws IOException {
+    testEncryptedWrite(10);
+  }
+  
+  private void testEncryptedWrite(int numDns) throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      setEncryptionConfigKeys(conf);
+      
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testEncryptedAppend() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      setEncryptionConfigKeys(conf);
+      
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      setEncryptionConfigKeys(conf);
+      
+      // start up 4 DNs
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+      
+      FileSystem fs = getFileSystem(conf);
+      
+      // Create a file with replication 3, so its block is on 3 / 4 DNs.
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      
+      // Shut down one of the DNs holding a block replica.
+      FSDataInputStream in = fs.open(TEST_PATH);
+      List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
+      in.close();
+      assertEquals(1, locatedBlocks.size());
+      assertEquals(3, locatedBlocks.get(0).getLocations().length);
+      DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
+      dn.shutdown();
+      
+      // Reopen the file for append, which will need to add another DN to the
+      // pipeline and in doing so trigger a block transfer.
+      writeTestDataToFile(fs);
+      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      
+      fs.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  private static void writeTestDataToFile(FileSystem fs) throws IOException {
+    OutputStream out = null;
+    if (!fs.exists(TEST_PATH)) {
+      out = fs.create(TEST_PATH);
+    } else {
+      out = fs.append(TEST_PATH);
+    }
+    out.write(PLAIN_TEXT.getBytes());
+    out.close();
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1370360&r1=1370359&r2=1370360&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Tue Aug  7 16:46:03 2012
@@ -162,7 +162,7 @@ public class TestBlockToken {
   public void testWritable() throws Exception {
     TestWritable.testWritable(new BlockTokenIdentifier());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
     TestWritable.testWritable(generateTokenId(sm, block1,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)));
     TestWritable.testWritable(generateTokenId(sm, block2,
@@ -201,9 +201,9 @@ public class TestBlockToken {
   @Test
   public void testBlockTokenSecretManager() throws Exception {
     BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
     BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime);
+        blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
     ExportedBlockKeys keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler);
@@ -238,7 +238,7 @@ public class TestBlockToken {
   @Test
   public void testBlockTokenRpc() throws Exception {
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
 
@@ -273,7 +273,7 @@ public class TestBlockToken {
   public void testBlockTokenRpcLeak() throws Exception {
     Assume.assumeTrue(FD_DIR.exists());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
 
@@ -342,9 +342,9 @@ public class TestBlockToken {
     for (int i = 0; i < 10; i++) {
       String bpid = Integer.toString(i);
       BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime, 0);
+          blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
       BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime);
+          blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
       bpMgr.addBlockPool(bpid, slaveHandler);
 
       ExportedBlockKeys keys = masterHandler.exportKeys();