You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cn...@apache.org on 2014/07/14 20:28:03 UTC

svn commit: r1610479 [2/2] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/ja...

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jul 14 18:28:02 2014
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.BufferedInputStream;
@@ -62,9 +64,11 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -202,6 +206,7 @@ public class Balancer {
   
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
+  private final SaslDataTransferClient saslClient;
   private final double threshold;
   
   // all data node lists
@@ -352,19 +357,18 @@ public class Balancer {
         
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
-        if (nnc.getDataEncryptionKey() != null) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn, nnc.getDataEncryptionKey());
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+        Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, nnc, accessToken, target.datanode);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.IO_FILE_BUFFER_SIZE));
         in = new DataInputStream(new BufferedInputStream(unbufIn,
             HdfsConstants.IO_FILE_BUFFER_SIZE));
         
-        sendRequest(out);
+        sendRequest(out, eb, accessToken);
         receiveResponse(in);
         bytesMoved.addAndGet(block.getNumBytes());
         LOG.info("Successfully moved " + this);
@@ -395,9 +399,8 @@ public class Balancer {
     }
     
     /* Send a block replace request to the output stream*/
-    private void sendRequest(DataOutputStream out) throws IOException {
-      final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
-      final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+        Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, accessToken,
           source.getStorageID(), proxySource.getDatanode());
     }
@@ -876,6 +879,12 @@ public class Balancer {
     this.maxConcurrentMovesPerNode =
         conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
   /* Given a data node set, build a network topology and decide

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Mon Jul 14 18:28:02 2014
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodePr
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon;
  * The class provides utilities for {@link Balancer} to access a NameNode
  */
 @InterfaceAudience.Private
-class NameNodeConnector {
+class NameNodeConnector implements DataEncryptionKeyFactory {
   private static final Log LOG = Balancer.LOG;
   private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
@@ -72,7 +72,6 @@ class NameNodeConnector {
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
   private DataEncryptionKey encryptionKey;
-  private final TrustedChannelResolver trustedChannelResolver;
 
   NameNodeConnector(URI nameNodeUri,
       Configuration conf) throws IOException {
@@ -122,7 +121,6 @@ class NameNodeConnector {
     if (out == null) {
       throw new IOException("Another balancer is running");
     }
-    this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
   }
 
   boolean shouldContinue(long dispatchBlockMoveBytes) {
@@ -154,10 +152,10 @@ class NameNodeConnector {
           BlockTokenSecretManager.AccessMode.COPY));
     }
   }
-  
-  DataEncryptionKey getDataEncryptionKey()
-      throws IOException {
-    if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
+
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() {
+    if (encryptDataTransfer) {
       synchronized (this) {
         if (encryptionKey == null) {
           encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Jul 14 18:28:02 2014
@@ -58,8 +58,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -211,14 +211,16 @@ public class JspHelper {
   }
 
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
-      long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
+      long blockId, final Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, final Configuration conf, DFSClient.Conf dfsConf,
-      final DataEncryptionKey encryptionKey)
+      final DFSClient dfs, final SaslDataTransferClient saslClient)
           throws IOException {
     if (chunkSizeToView == 0) return;
     int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
+    DatanodeID datanodeId = new DatanodeID(addr.getAddress().getHostAddress(),
+      addr.getHostName(), poolId, addr.getPort(), 0, 0, 0);
     BlockReader blockReader = new BlockReaderFactory(dfsConf).
       setInetSocketAddress(addr).
       setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
@@ -229,21 +231,21 @@ public class JspHelper {
       setVerifyChecksum(true).
       setClientName("JspHelper").
       setClientCacheContext(ClientContext.getFromConf(conf)).
-      setDatanodeInfo(new DatanodeInfo(
-          new DatanodeID(addr.getAddress().getHostAddress(),
-              addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))).
+      setDatanodeInfo(new DatanodeInfo(datanodeId)).
       setCachingStrategy(CachingStrategy.newDefaultStrategy()).
       setConfiguration(conf).
       setRemotePeerFactory(new RemotePeerFactory() {
         @Override
-        public Peer newConnectedPeer(InetSocketAddress addr)
+        public Peer newConnectedPeer(InetSocketAddress addr,
+            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
             throws IOException {
           Peer peer = null;
           Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
           try {
             sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
             sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-            peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey);
+            peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, dfs,
+                blockToken, datanodeId);
           } finally {
             if (peer == null) {
               IOUtils.closeSocket(sock);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Mon Jul 14 18:28:02 2014
@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
 
 /**
  * Simple class encapsulating all of the configuration that the DataNode
@@ -86,6 +88,7 @@ public class DNConf {
   
   final String minimumNameNodeVersion;
   final String encryptionAlgorithm;
+  final SaslPropertiesResolver saslPropsResolver;
   final TrustedChannelResolver trustedChannelResolver;
   
   final long xceiverStopTimeout;
@@ -168,6 +171,8 @@ public class DNConf {
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
     this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+    this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+      conf);
     
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -186,7 +191,26 @@ public class DNConf {
   String getMinimumNameNodeVersion() {
     return this.minimumNameNodeVersion;
   }
-  
+
+  /**
+   * Returns true if encryption enabled for DataTransferProtocol.
+   *
+   * @return boolean true if encryption enabled for DataTransferProtocol
+   */
+  public boolean getEncryptDataTransfer() {
+    return encryptDataTransfer;
+  }
+
+  /**
+   * Returns encryption algorithm configured for DataTransferProtocol, or null
+   * if not configured.
+   *
+   * @return encryption algorithm configured for DataTransferProtocol
+   */
+  public String getEncryptionAlgorithm() {
+    return encryptionAlgorithm;
+  }
+
   public long getXceiverStopTimeout() {
     return xceiverStopTimeout;
   }
@@ -194,4 +218,24 @@ public class DNConf {
   public long getMaxLockedMemory() {
     return maxLockedMemory;
   }
+
+  /**
+   * Returns the SaslPropertiesResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+   */
+  public SaslPropertiesResolver getSaslPropsResolver() {
+    return saslPropsResolver;
+  }
+
+  /**
+   * Returns the TrustedChannelResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return TrustedChannelResolver configured for use with DataTransferProtocol
+   */
+  public TrustedChannelResolver getTrustedChannelResolver() {
+    return trustedChannelResolver;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jul 14 18:28:02 2014
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -41,6 +44,9 @@ import org.apache.hadoop.hdfs.net.Domain
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -227,6 +233,8 @@ public class DataNode extends Configured
   private final List<String> usersWithLocalPathAccess;
   private final boolean connectToDnViaHostname;
   ReadaheadPool readaheadPool;
+  SaslDataTransferClient saslClient;
+  SaslDataTransferServer saslServer;
   private final boolean getHdfsBlockLocationsEnabled;
   private ObjectName dataNodeInfoBeanName;
   private Thread checkDiskErrorThread = null;
@@ -729,15 +737,10 @@ public class DataNode extends Configured
    */
   void startDataNode(Configuration conf, 
                      List<StorageLocation> dataDirs,
-                    // DatanodeProtocol namenode,
                      SecureResources resources
                      ) throws IOException {
-    if(UserGroupInformation.isSecurityEnabled() && resources == null) {
-      if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
-        throw new RuntimeException("Cannot start secure cluster without "
-            + "privileged resources.");
-      }
-    }
+
+    checkSecureConfig(conf, resources);
 
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
@@ -797,6 +800,55 @@ public class DataNode extends Configured
     // Create the ReadaheadPool from the DataNode context so we can
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
+    saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+      dnConf.trustedChannelResolver,
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+  }
+
+  /**
+   * Checks if the DataNode has a secure configuration if security is enabled.
+   * There are 2 possible configurations that are considered secure:
+   * 1. The server has bound to privileged ports for RPC and HTTP via
+   *   SecureDataNodeStarter.
+   * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+   *   plain HTTP) for the HTTP server.  The SASL handshake guarantees
+   *   authentication of the RPC server before a client transmits a secret, such
+   *   as a block access token.  Similarly, SSL guarantees authentication of the
+   *   HTTP server before a client transmits a secret, such as a delegation
+   *   token.
+   * It is not possible to run with both privileged ports and SASL on
+   * DataTransferProtocol.  For backwards-compatibility, the connection logic
+   * must check if the target port is a privileged port, and if so, skip the
+   * SASL handshake.
+   *
+   * @param conf Configuration to check
+   * @param resources SecuredResources obtained for DataNode
+   * @throws RuntimeException if security enabled, but configuration is insecure
+   */
+  private static void checkSecureConfig(Configuration conf,
+      SecureResources resources) throws RuntimeException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+    if (resources != null && dataTransferProtection == null) {
+      return;
+    }
+    if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+      return;
+    }
+    if (dataTransferProtection != null &&
+        DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+        resources == null) {
+      return;
+    }
+    throw new RuntimeException("Cannot start secure DataNode without " +
+      "configuring either privileged resources or SASL RPC data transfer " +
+      "protection and SSL for HTTP.  Using privileged resources in " +
+      "combination with SASL RPC data transfer protection is not supported.");
   }
   
   public static String generateUuid() {
@@ -1630,20 +1682,25 @@ public class DataNode extends Configured
         NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
         sock.setSoTimeout(targets.length * dnConf.socketTimeout);
 
+        //
+        // Header info
+        //
+        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+        if (isBlockTokenEnabled) {
+          accessToken = blockPoolTokenSecretManager.generateToken(b, 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+        }
+
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        if (dnConf.encryptDataTransfer && 
-            !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn,
-                  blockPoolTokenSecretManager.generateDataEncryptionKey(
-                      b.getBlockPoolId()));
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        DataEncryptionKeyFactory keyFactory =
+          getDataEncryptionKeyFactoryForBlock(b);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, keyFactory, accessToken, bpReg);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1652,15 +1709,6 @@ public class DataNode extends Configured
             false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
-        //
-        // Header info
-        //
-        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
-        if (isBlockTokenEnabled) {
-          accessToken = blockPoolTokenSecretManager.generateToken(b, 
-              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
-        }
-
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
 
@@ -1703,7 +1751,26 @@ public class DataNode extends Configured
       }
     }
   }
-  
+
+  /**
+   * Returns a new DataEncryptionKeyFactory that generates a key from the
+   * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+   *
+   * @param block for which the factory needs to create a key
+   * @return DataEncryptionKeyFactory for block's block pool ID
+   */
+  DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+      final ExtendedBlock block) {
+    return new DataEncryptionKeyFactory() {
+      @Override
+      public DataEncryptionKey newDataEncryptionKey() {
+        return dnConf.encryptDataTransfer ?
+          blockPoolTokenSecretManager.generateDataEncryptionKey(
+            block.getBlockPoolId()) : null;
+      }
+    };
+  }
+
   /**
    * After a block becomes finalized, a datanode increases metric counter,
    * notifies namenode, and adds it to the block scanner

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Jul 14 18:28:02 2014
@@ -36,11 +36,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
-import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
 import java.security.MessageDigest;
 import java.util.Arrays;
@@ -52,13 +50,12 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -85,7 +82,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
 import com.google.protobuf.ByteString;
 
 
@@ -174,24 +170,11 @@ class DataXceiver extends Receiver imple
       dataXceiverServer.addPeer(peer, Thread.currentThread());
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
-      if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
-          !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
-        IOStreamPair encryptedStreams = null;
-        try {
-          encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
-              socketIn, datanode.blockPoolTokenSecretManager,
-              dnConf.encryptionAlgorithm);
-        } catch (InvalidMagicNumberException imne) {
-          LOG.info("Failed to read expected encryption handshake from client " +
-              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
-              "is running an older version of Hadoop which does not support " +
-              "encryption");
-          return;
-        }
-        input = encryptedStreams.in;
-        socketOut = encryptedStreams.out;
-      }
-      input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+      IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+        socketIn, datanode.getDatanodeId());
+      input = new BufferedInputStream(saslStreams.in,
+        HdfsConstants.SMALL_BUFFER_SIZE);
+      socketOut = saslStreams.out;
       
       super.initialize(new DataInputStream(input));
       
@@ -263,19 +246,6 @@ class DataXceiver extends Receiver imple
       }
     }
   }
-  
-  /**
-   * Returns InetAddress from peer
-   * The getRemoteAddressString is the form  /ip-address:port
-   * The ip-address is extracted from peer and InetAddress is formed
-   * @param peer
-   * @return
-   * @throws UnknownHostException
-   */
-  private static InetAddress getClientAddress(Peer peer) {
-    return InetAddresses.forString(
-        peer.getRemoteAddressString().split(":")[0].substring(1));
-  }
 
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -656,17 +626,12 @@ class DataXceiver extends Receiver imple
           OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
               writeTimeout);
           InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
-          if (dnConf.encryptDataTransfer &&
-              !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
-            IOStreamPair encryptedStreams =
-                DataTransferEncryptor.getEncryptedStreams(
-                    unbufMirrorOut, unbufMirrorIn,
-                    datanode.blockPoolTokenSecretManager
-                        .generateDataEncryptionKey(block.getBlockPoolId()));
-            
-            unbufMirrorOut = encryptedStreams.out;
-            unbufMirrorIn = encryptedStreams.in;
-          }
+          DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+          unbufMirrorOut = saslStreams.out;
+          unbufMirrorIn = saslStreams.in;
           mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
               HdfsConstants.SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(unbufMirrorIn);
@@ -1026,17 +991,12 @@ class DataXceiver extends Receiver imple
       OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
           dnConf.socketWriteTimeout);
       InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
-      if (dnConf.encryptDataTransfer && 
-          !dnConf.trustedChannelResolver.isTrusted(
-              proxySock.getInetAddress())) {
-        IOStreamPair encryptedStreams =
-            DataTransferEncryptor.getEncryptedStreams(
-                unbufProxyOut, unbufProxyIn,
-                datanode.blockPoolTokenSecretManager
-                    .generateDataEncryptionKey(block.getBlockPoolId()));
-        unbufProxyOut = encryptedStreams.out;
-        unbufProxyIn = encryptedStreams.in;
-      }
+      DataEncryptionKeyFactory keyFactory =
+        datanode.getDataEncryptionKeyFactoryForBlock(block);
+      IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+        unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+      unbufProxyOut = saslStreams.out;
+      unbufProxyIn = saslStreams.in;
       
       proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
           HdfsConstants.SMALL_BUFFER_SIZE));

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Mon Jul 14 18:28:02 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
@@ -510,7 +511,7 @@ public class DatanodeJspHelper {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
           datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
           startOffset, chunkSizeToView, out, conf, dfs.getConf(),
-          dfs.getDataEncryptionKey());
+          dfs, getSaslDataTransferClient(req));
     } catch (Exception e) {
       out.print(e);
     }
@@ -669,7 +670,7 @@ public class DatanodeJspHelper {
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
         blockSize, startOffset, chunkSizeToView, out, conf, dfs.getConf(),
-        dfs.getDataEncryptionKey());
+        dfs, getSaslDataTransferClient(req));
     out.print("</textarea>");
     dfs.close();
   }
@@ -702,4 +703,16 @@ public class DatanodeJspHelper {
     return sb.toString();
   }
 
+  /**
+   * Gets the {@link SaslDataTransferClient} from the {@link DataNode} attached
+   * to the servlet context.
+   *
+   * @return SaslDataTransferClient from DataNode
+   */
+  private static SaslDataTransferClient getSaslDataTransferClient(
+      HttpServletRequest req) {
+    DataNode dataNode = (DataNode)req.getSession().getServletContext()
+      .getAttribute("datanode");
+    return dataNode.saslClient;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Jul 14 18:28:02 2014
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -92,7 +103,7 @@ import com.google.common.annotations.Vis
  *  factors of each file.
  */
 @InterfaceAudience.Private
-public class NamenodeFsck {
+public class NamenodeFsck implements DataEncryptionKeyFactory {
   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
   
   // return string marking fsck status
@@ -149,6 +160,7 @@ public class NamenodeFsck {
   private List<String> snapshottableDirs = null;
 
   private final BlockPlacementPolicy bpPolicy;
+  private final SaslDataTransferClient saslClient;
 
   /**
    * Filesystem checker.
@@ -175,6 +187,12 @@ public class NamenodeFsck {
         networktopology,
         namenode.getNamesystem().getBlockManager().getDatanodeManager()
         .getHost2DatanodeMap());
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
     
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -616,15 +634,16 @@ public class NamenodeFsck {
             setConfiguration(namenode.conf).
             setRemotePeerFactory(new RemotePeerFactory() {
               @Override
-              public Peer newConnectedPeer(InetSocketAddress addr)
+              public Peer newConnectedPeer(InetSocketAddress addr,
+                  Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
                   throws IOException {
                 Peer peer = null;
                 Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
                 try {
                   s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
                   s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-                  peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                        getDataEncryptionKey());
+                  peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
+                        NamenodeFsck.this, blockToken, datanodeId);
                 } finally {
                   if (peer == null) {
                     IOUtils.closeQuietly(s);
@@ -663,7 +682,12 @@ public class NamenodeFsck {
       throw new Exception("Could not copy block data for " + lblock.getBlock());
     }
   }
-      
+
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() throws IOException {
+    return namenode.getRpcServer().getDataEncryptionKey();
+  }
+
   /*
    * XXX (ab) See comment above for copyBlock().
    *

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Mon Jul 14 18:28:02 2014
@@ -1452,6 +1452,37 @@
 </property>
 
 <property>
+  <name>dfs.data.transfer.protection</name>
+  <value></value>
+  <description>
+    A comma-separated list of SASL protection values used for secured
+    connections to the DataNode when reading or writing block data.  Possible
+    values are authentication, integrity and privacy.  authentication means
+    authentication only and no integrity or privacy; integrity implies
+    authentication and integrity are enabled; and privacy implies all of
+    authentication, integrity and privacy are enabled.  If
+    dfs.encrypt.data.transfer is set to true, then it supersedes the setting for
+    dfs.data.transfer.protection and enforces that all connections must use a
+    specialized encrypted SASL handshake.  This property is ignored for
+    connections to a DataNode listening on a privileged port.  In this case, it
+    is assumed that the use of a privileged port establishes sufficient trust.
+  </description>
+</property>
+
+<property>
+  <name>dfs.data.transfer.saslproperties.resolver.class</name>
+  <value></value>
+  <description>
+    SaslPropertiesResolver used to resolve the QOP used for a connection to the
+    DataNode when reading or writing block data.  If not specified, the full set
+    of values specified in dfs.data.transfer.protection is used while
+    determining the QOP used for the connection. If a class is specified, then
+    the QOP values returned by the class will be used while determining the QOP
+    used for the connection.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
   <value>false</value>
   <description>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Jul 14 18:28:02 2014
@@ -33,9 +33,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.shortcircu
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 
@@ -192,7 +195,8 @@ public class BlockReaderTestUtil {
       setAllowShortCircuitLocalReads(true).
       setRemotePeerFactory(new RemotePeerFactory() {
         @Override
-        public Peer newConnectedPeer(InetSocketAddress addr)
+        public Peer newConnectedPeer(InetSocketAddress addr,
+            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
             throws IOException {
           Peer peer = null;
           Socket sock = NetUtils.
@@ -251,4 +255,4 @@ public class BlockReaderTestUtil {
     LogManager.getLogger(DataNode.class.getName()).setLevel(
         Level.TRACE);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Jul 14 18:28:02 2014
@@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@@ -1310,15 +1313,42 @@ public class MiniDFSCluster {
       }
 
       SecureResources secureResources = null;
-      if (UserGroupInformation.isSecurityEnabled()) {
+      if (UserGroupInformation.isSecurityEnabled() &&
+          conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY) == null) {
         try {
           secureResources = SecureDataNodeStarter.getSecureResources(dnConf);
         } catch (Exception ex) {
           ex.printStackTrace();
         }
       }
-      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf,
-                                                 secureResources);
+      final int maxRetriesOnSasl = conf.getInt(
+        IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
+        IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
+      int numRetries = 0;
+      DataNode dn = null;
+      while (true) {
+        try {
+          dn = DataNode.instantiateDataNode(dnArgs, dnConf,
+                                            secureResources);
+          break;
+        } catch (IOException e) {
+          // Work around issue testing security where rapidly starting multiple
+          // DataNodes using the same principal gets rejected by the KDC as a
+          // replay attack.
+          if (UserGroupInformation.isSecurityEnabled() &&
+              numRetries < maxRetriesOnSasl) {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+              break;
+            }
+            ++numRetries;
+            continue;
+          }
+          throw e;
+        }
+      }
       if(dn == null)
         throw new IOException("Cannot start DataNode in "
             + dnConf.get(DFS_DATANODE_DATA_DIR_KEY));

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java?rev=1610479&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java Mon Jul 14 18:28:02 2014
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class SaslDataTransferTestCase {
+
+  private static File baseDir;
+  private static String hdfsPrincipal;
+  private static MiniKdc kdc;
+  private static String keytab;
+  private static String spnegoPrincipal;
+
+  @BeforeClass
+  public static void initKdc() throws Exception {
+    baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+      SaslDataTransferTestCase.class.getSimpleName());
+    FileUtil.fullyDelete(baseDir);
+    assertTrue(baseDir.mkdirs());
+
+    Properties kdcConf = MiniKdc.createConf();
+    kdc = new MiniKdc(kdcConf, baseDir);
+    kdc.start();
+
+    String userName = UserGroupInformation.getLoginUser().getShortUserName();
+    File keytabFile = new File(baseDir, userName + ".keytab");
+    keytab = keytabFile.getAbsolutePath();
+    kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");
+    hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();
+    spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
+  }
+
+  @AfterClass
+  public static void shutdownKdc() {
+    if (kdc != null) {
+      kdc.stop();
+    }
+    FileUtil.fullyDelete(baseDir);
+  }
+
+  /**
+   * Creates configuration for starting a secure cluster.
+   *
+   * @param dataTransferProtection supported QOPs
+   * @return configuration for starting a secure cluster
+   * @throws Exception if there is any failure
+   */
+  protected HdfsConfiguration createSecureConfig(
+      String dataTransferProtection) throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
+    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
+    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
+    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+
+    String keystoresDir = baseDir.getAbsolutePath();
+    String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());
+    KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
+    return conf;
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java?rev=1610479&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java Mon Jul 14 18:28:02 2014
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSaslDataTransfer extends SaslDataTransferTestCase {
+
+  private static final int BLOCK_SIZE = 4096;
+  private static final int BUFFER_SIZE= 1024;
+  private static final int NUM_BLOCKS = 3;
+  private static final Path PATH  = new Path("/file1");
+  private static final short REPLICATION = 3;
+
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @After
+  public void shutdown() {
+    IOUtils.cleanup(null, fs);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAuthentication() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testIntegrity() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "integrity");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testPrivacy() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testClientAndServerDoNotHaveCommonQop() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    exception.expect(IOException.class);
+    exception.expectMessage("could only be replicated to 0 nodes");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testClientSaslNoServerSasl() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    exception.expect(IOException.class);
+    exception.expectMessage("could only be replicated to 0 nodes");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testServerSaslNoClientSasl() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "");
+    exception.expect(IOException.class);
+    exception.expectMessage("could only be replicated to 0 nodes");
+    doTest(clientConf);
+  }
+
+  /**
+   * Tests DataTransferProtocol with the given client configuration.
+   *
+   * @param conf client configuration
+   * @throws IOException if there is an I/O error
+   */
+  private void doTest(HdfsConfiguration conf) throws IOException {
+    fs = FileSystem.get(cluster.getURI(), conf);
+    FileSystemTestHelper.createFile(fs, PATH, NUM_BLOCKS, BLOCK_SIZE);
+    assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
+      DFSTestUtil.readFile(fs, PATH).getBytes("UTF-8"));
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(PATH, 0,
+      Long.MAX_VALUE);
+    assertNotNull(blockLocations);
+    assertEquals(NUM_BLOCKS, blockLocations.length);
+    for (BlockLocation blockLocation: blockLocations) {
+      assertNotNull(blockLocation.getHosts());
+      assertEquals(3, blockLocation.getHosts().length);
+    }
+  }
+
+  /**
+   * Starts a cluster with the given configuration.
+   *
+   * @param conf cluster configuration
+   * @throws IOException if there is an I/O error
+   */
+  private void startCluster(HdfsConfiguration conf) throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java?rev=1610479&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java Mon Jul 14 18:28:02 2014
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.junit.Test;
+
+public class TestBalancerWithSaslDataTransfer extends SaslDataTransferTestCase {
+
+  private static final TestBalancer TEST_BALANCER = new TestBalancer();
+
+  @Test
+  public void testBalancer0Authentication() throws Exception {
+    TEST_BALANCER.testBalancer0Internal(createSecureConfig("authentication"));
+  }
+
+  @Test
+  public void testBalancer0Integrity() throws Exception {
+    TEST_BALANCER.testBalancer0Internal(createSecureConfig("integrity"));
+  }
+
+  @Test
+  public void testBalancer0Privacy() throws Exception {
+    TEST_BALANCER.testBalancer0Internal(createSecureConfig("privacy"));
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Mon Jul 14 18:28:02 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS {
           setConfiguration(conf).
           setRemotePeerFactory(new RemotePeerFactory() {
             @Override
-            public Peer newConnectedPeer(InetSocketAddress addr)
+            public Peer newConnectedPeer(InetSocketAddress addr,
+                Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
                 throws IOException {
               Peer peer = null;
               Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1610479&r1=1610478&r2=1610479&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Jul 14 18:28:02 2014
@@ -46,9 +46,11 @@ import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -307,7 +310,8 @@ public class TestDataNodeVolumeFailure {
       setConfiguration(conf).
       setRemotePeerFactory(new RemotePeerFactory() {
         @Override
-        public Peer newConnectedPeer(InetSocketAddress addr)
+        public Peer newConnectedPeer(InetSocketAddress addr,
+            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
             throws IOException {
           Peer peer = null;
           Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();