You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/08/16 17:59:31 UTC

[hadoop] 03/04: HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f507bc059d6cc22a2eb04459251f23d8bf89535c
Author: Chen Liang <cl...@apache.org>
AuthorDate: Fri Apr 12 17:37:51 2019 -0700

    HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.
---
 .../hadoop/security/token/SecretManager.java       |   2 +-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   3 +
 .../datatransfer/sasl/DataTransferSaslUtil.java    |  72 +++++++
 .../datatransfer/sasl/SaslDataTransferClient.java  | 117 +++++++++--
 .../src/main/proto/datatransfer.proto              |   6 +
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../datatransfer/sasl/SaslDataTransferServer.java  |  53 ++++-
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |   6 +
 .../hadoop/hdfs/server/datanode/DataNode.java      |   7 +-
 .../hadoop/hdfs/server/datanode/DataXceiver.java   |  14 +-
 .../src/main/resources/hdfs-default.xml            |  22 +++
 .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java    |   2 +-
 .../apache/hadoop/hdfs/TestMultipleNNPortQOP.java  | 219 +++++++++++++++++++++
 .../datanode/TestDataXceiverBackwardsCompat.java   |   3 +-
 14 files changed, 502 insertions(+), 28 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
index 798c8c9..514806d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
@@ -167,7 +167,7 @@ public abstract class SecretManager<T extends TokenIdentifier> {
    * @param key the secret key
    * @return the bytes of the generated password
    */
-  protected static byte[] createPassword(byte[] identifier, 
+  public static byte[] createPassword(byte[] identifier,
                                          SecretKey key) {
     Mac mac = threadLocalMac.get();
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 55b1722..3d934b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -157,6 +157,9 @@ public interface HdfsClientConfigKeys {
   String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
       "dfs.encrypt.data.transfer.cipher.suites";
 
+  String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY =
+      "dfs.encrypt.data.overwrite.downstream.new.qop";
+
   String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
   String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
   String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index f4651eb..666a29f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -249,6 +250,51 @@ public final class DataTransferSaslUtil {
     }
   }
 
+  static class SaslMessageWithHandshake {
+    private final byte[] payload;
+    private final byte[] secret;
+    private final String bpid;
+
+    SaslMessageWithHandshake(byte[] payload, byte[] secret, String bpid) {
+      this.payload = payload;
+      this.secret = secret;
+      this.bpid = bpid;
+    }
+
+    byte[] getPayload() {
+      return payload;
+    }
+
+    byte[] getSecret() {
+      return secret;
+    }
+
+    String getBpid() {
+      return bpid;
+    }
+  }
+
+  public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
+      InputStream in) throws IOException {
+    DataTransferEncryptorMessageProto proto =
+        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+      throw new InvalidEncryptionKeyException(proto.getMessage());
+    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+      throw new IOException(proto.getMessage());
+    } else {
+      byte[] payload = proto.getPayload().toByteArray();
+      byte[] secret = null;
+      String bpid = null;
+      if (proto.hasHandshakeSecret()) {
+        HandshakeSecretProto handshakeSecret = proto.getHandshakeSecret();
+        secret = handshakeSecret.getSecret().toByteArray();
+        bpid = handshakeSecret.getBpid();
+      }
+      return new SaslMessageWithHandshake(payload, secret, bpid);
+    }
+  }
+
   /**
    * Negotiate a cipher option which server supports.
    *
@@ -375,6 +421,12 @@ public final class DataTransferSaslUtil {
     sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
   }
 
+  public static void sendSaslMessageHandshakeSecret(OutputStream out,
+      byte[] payload, byte[] secret, String bpid) throws IOException {
+    sendSaslMessageHandshakeSecret(out, DataTransferEncryptorStatus.SUCCESS,
+        payload, null, secret, bpid);
+  }
+
   /**
    * Send a SASL negotiation message and negotiation cipher options to server.
    *
@@ -497,6 +549,13 @@ public final class DataTransferSaslUtil {
   public static void sendSaslMessage(OutputStream out,
       DataTransferEncryptorStatus status, byte[] payload, String message)
       throws IOException {
+    sendSaslMessage(out, status, payload, message, null);
+  }
+
+  public static void sendSaslMessage(OutputStream out,
+      DataTransferEncryptorStatus status, byte[] payload, String message,
+      HandshakeSecretProto handshakeSecret)
+      throws IOException {
     DataTransferEncryptorMessageProto.Builder builder =
         DataTransferEncryptorMessageProto.newBuilder();
 
@@ -507,12 +566,25 @@ public final class DataTransferSaslUtil {
     if (message != null) {
       builder.setMessage(message);
     }
+    if (handshakeSecret != null) {
+      builder.setHandshakeSecret(handshakeSecret);
+    }
 
     DataTransferEncryptorMessageProto proto = builder.build();
     proto.writeDelimitedTo(out);
     out.flush();
   }
 
+  public static void sendSaslMessageHandshakeSecret(OutputStream out,
+      DataTransferEncryptorStatus status, byte[] payload, String message,
+      byte[] secret, String bpid) throws IOException {
+    HandshakeSecretProto.Builder builder =
+        HandshakeSecretProto.newBuilder();
+    builder.setSecret(ByteString.copyFrom(secret));
+    builder.setBpid(bpid);
+    sendSaslMessage(out, status, payload, message, builder.build());
+  }
+
   /**
    * There is no reason to instantiate this class.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index a23a108..8d1c7f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -18,8 +18,11 @@
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.crypto.SecretKey;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -39,6 +43,7 @@ import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.RealmCallback;
 import javax.security.sasl.RealmChoiceCallback;
 
+import javax.security.sasl.Sasl;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,6 +89,10 @@ public class SaslDataTransferClient {
   private final SaslPropertiesResolver saslPropsResolver;
   private final TrustedChannelResolver trustedChannelResolver;
 
+  // Store the most recent successfully negotiated QOP,
+  // for testing purpose only
+  private String targetQOP;
+
   /**
    * Creates a new SaslDataTransferClient.  This constructor is used in cases
    * where it is not relevant to track if a secure client did a fallback to
@@ -140,7 +150,7 @@ public class SaslDataTransferClient {
     DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
         encryptionKeyFactory.newDataEncryptionKey() : null;
     IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
-        underlyingIn, encryptionKey, accessToken, datanodeId);
+        underlyingIn, encryptionKey, accessToken, datanodeId, null);
     return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
   }
 
@@ -180,8 +190,19 @@ public class SaslDataTransferClient {
       InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
       Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
       throws IOException {
+    return socketSend(socket, underlyingOut, underlyingIn, encryptionKeyFactory,
+        accessToken, datanodeId, null);
+  }
+
+  public IOStreamPair socketSend(
+      Socket socket, OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+      SecretKey secretKey)
+      throws IOException {
     IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
-        underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
+        underlyingIn, encryptionKeyFactory, accessToken, datanodeId,
+        secretKey);
     return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
   }
 
@@ -203,17 +224,26 @@ public class SaslDataTransferClient {
       DataEncryptionKeyFactory encryptionKeyFactory,
       Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
       throws IOException {
+    return checkTrustAndSend(addr, underlyingOut, underlyingIn,
+        encryptionKeyFactory, accessToken, datanodeId, null);
+  }
+
+  private IOStreamPair checkTrustAndSend(
+      InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+      SecretKey secretKey)
+      throws IOException {
     boolean localTrusted = trustedChannelResolver.isTrusted();
     boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
-    LOG.debug("SASL encryption trust check: localHostTrusted = {}, "
+    LOG.info("SASL encryption trust check: localHostTrusted = {}, "
         + "remoteHostTrusted = {}", localTrusted, remoteTrusted);
-
     if (!localTrusted || !remoteTrusted) {
       // The encryption key factory only returns a key if encryption is enabled.
-      DataEncryptionKey encryptionKey = encryptionKeyFactory
-          .newDataEncryptionKey();
+      DataEncryptionKey encryptionKey =
+          encryptionKeyFactory.newDataEncryptionKey();
       return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
-          datanodeId);
+          datanodeId, secretKey);
     } else {
       LOG.debug(
           "SASL client skipping handshake on trusted connection for addr = {}, "
@@ -237,13 +267,14 @@ public class SaslDataTransferClient {
    */
   private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
       InputStream underlyingIn, DataEncryptionKey encryptionKey,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+      SecretKey secretKey)
       throws IOException {
     if (encryptionKey != null) {
       LOG.debug("SASL client doing encrypted handshake for addr = {}, "
           + "datanodeId = {}", addr, datanodeId);
       return getEncryptedStreams(addr, underlyingOut, underlyingIn,
-          encryptionKey);
+          encryptionKey, accessToken, secretKey);
     } else if (!UserGroupInformation.isSecurityEnabled()) {
       LOG.debug("SASL client skipping handshake in unsecured configuration for "
           + "addr = {}, datanodeId = {}", addr, datanodeId);
@@ -264,7 +295,8 @@ public class SaslDataTransferClient {
       LOG.debug(
           "SASL client doing general handshake for addr = {}, datanodeId = {}",
           addr, datanodeId);
-      return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken);
+      return getSaslStreams(addr, underlyingOut, underlyingIn,
+          accessToken, secretKey);
     } else {
       // It's a secured cluster using non-privileged ports, but no SASL.  The
       // only way this can happen is if the DataNode has
@@ -287,11 +319,20 @@ public class SaslDataTransferClient {
    * @throws IOException for any error
    */
   private IOStreamPair getEncryptedStreams(InetAddress addr,
-      OutputStream underlyingOut,
-      InputStream underlyingIn, DataEncryptionKey encryptionKey)
+      OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKey encryptionKey,
+      Token<BlockTokenIdentifier> accessToken,
+      SecretKey secretKey)
       throws IOException {
     Map<String, String> saslProps = createSaslPropertiesForEncryption(
         encryptionKey.encryptionAlgorithm);
+    if (secretKey != null) {
+      LOG.debug("DataNode overwriting downstream QOP" +
+          saslProps.get(Sasl.QOP));
+      byte[] newSecret =  SecretManager.createPassword(saslProps.get(Sasl.QOP)
+          .getBytes(Charsets.UTF_8), secretKey);
+      accessToken.setDNHandshakeSecret(newSecret);
+    }
 
     LOG.debug("Client using encryption algorithm {}",
         encryptionKey.encryptionAlgorithm);
@@ -301,7 +342,7 @@ public class SaslDataTransferClient {
     CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
         password);
     return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
-        saslProps, callbackHandler);
+        saslProps, callbackHandler, accessToken);
   }
 
   /**
@@ -370,6 +411,11 @@ public class SaslDataTransferClient {
     }
   }
 
+  @VisibleForTesting
+  public String getTargetQOP() {
+    return targetQOP;
+  }
+
   /**
    * Sends client SASL negotiation for general-purpose handshake.
    *
@@ -382,16 +428,36 @@ public class SaslDataTransferClient {
    */
   private IOStreamPair getSaslStreams(InetAddress addr,
       OutputStream underlyingOut, InputStream underlyingIn,
-      Token<BlockTokenIdentifier> accessToken)
+      Token<BlockTokenIdentifier> accessToken,
+      SecretKey secretKey)
       throws IOException {
     Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
 
+    // secretKey != null only happens when this is called by DN
+    // sending to downstream DN. If called from client, this will be null,
+    // as there is no key for client to generate mac instance.
+    // So that, if a different QOP is desired for inter-DN communication,
+    // the check below will use new QOP to create a secret, which includes
+    // the new QOP.
+    if (secretKey != null) {
+      String newQOP = conf
+          .get(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY);
+      if (newQOP != null) {
+        saslProps.put(Sasl.QOP, newQOP);
+      }
+      LOG.debug("DataNode overwriting downstream QOP " +
+          saslProps.get(Sasl.QOP));
+      byte[] newSecret = SecretManager.createPassword(
+          saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
+      accessToken.setDNHandshakeSecret(newSecret);
+    }
+    targetQOP = saslProps.get(Sasl.QOP);
     String userName = buildUserName(accessToken);
     char[] password = buildClientPassword(accessToken);
     CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
         password);
     return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
-        saslProps, callbackHandler);
+        saslProps, callbackHandler, accessToken);
   }
 
   /**
@@ -435,8 +501,8 @@ public class SaslDataTransferClient {
    */
   private IOStreamPair doSaslHandshake(InetAddress addr,
       OutputStream underlyingOut, InputStream underlyingIn, String userName,
-      Map<String, String> saslProps,
-      CallbackHandler callbackHandler) throws IOException {
+      Map<String, String> saslProps, CallbackHandler callbackHandler,
+      Token<BlockTokenIdentifier> accessToken) throws IOException {
 
     DataOutputStream out = new DataOutputStream(underlyingOut);
     DataInputStream in = new DataInputStream(underlyingIn);
@@ -449,7 +515,22 @@ public class SaslDataTransferClient {
 
     try {
       // Start of handshake - "initial response" in SASL terminology.
-      sendSaslMessage(out, new byte[0]);
+      // The handshake secret can be null, this happens when client is running
+      // a new version but the cluster does not have this feature. In which case
+      // there will be no encrypted secret sent from NN.
+      byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
+      if (handshakeSecret == null || handshakeSecret.length == 0) {
+        LOG.debug("Handshake secret is null, sending without "
+            + "handshake secret.");
+        sendSaslMessage(out, new byte[0]);
+      } else {
+        LOG.debug("Sending handshake secret.");
+        BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+        identifier.readFields(new DataInputStream(
+            new ByteArrayInputStream(accessToken.getIdentifier())));
+        String bpid = identifier.getBlockPoolId();
+        sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
+      }
 
       // step 1
       byte[] remoteResponse = readSaslMessage(in);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 384da54..43a03e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -43,6 +43,12 @@ message DataTransferEncryptorMessageProto {
   optional bytes payload = 2;
   optional string message = 3;
   repeated CipherOptionProto cipherOption = 4;
+  optional HandshakeSecretProto handshakeSecret = 5;
+}
+
+message HandshakeSecretProto {
+  required bytes secret = 1;
+  required string bpid = 2;
 }
 
 message BaseHeaderProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b05c08d..7466021 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1006,6 +1006,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   // Security-related configs
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
+  public static final String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY =
+      "dfs.encrypt.data.overwrite.downstream.derived.qop";
+  public static final boolean DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT =
+      false;
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_XFRAME_OPTION_ENABLED = "dfs.xframe.enabled";
   public static final boolean DFS_XFRAME_OPTION_ENABLED_DEFAULT = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index e3a72d0..d162d9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -21,15 +21,18 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import javax.crypto.SecretKey;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -37,6 +40,7 @@ import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 
 import org.apache.commons.codec.binary.Base64;
@@ -48,12 +52,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +85,10 @@ public class SaslDataTransferServer {
   private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private final DNConf dnConf;
 
+  // Store the most recent successfully negotiated QOP,
+  // for testing purpose only
+  private String negotiatedQOP;
+
   /**
    * Creates a new SaslDataTransferServer.
    *
@@ -337,6 +348,26 @@ public class SaslDataTransferServer {
     return identifier;
   }
 
+  private String examineSecret(byte[] secret, String bpid) {
+    BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
+    SecretKey secretKey = blockKey.getKey();
+    for (SaslRpcServer.QualityOfProtection qop :
+        SaslRpcServer.QualityOfProtection.values()) {
+      String qopString = qop.getSaslQop();
+      byte[] data = qopString.getBytes(Charsets.UTF_8);
+      byte[] encryptedData = SecretManager.createPassword(data, secretKey);
+      if (Arrays.equals(encryptedData, secret)) {
+        return qopString;
+      }
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  public String getNegotiatedQOP() {
+    return negotiatedQOP;
+  }
+
   /**
    * This method actually executes the server-side SASL handshake.
    *
@@ -355,9 +386,6 @@ public class SaslDataTransferServer {
     DataInputStream in = new DataInputStream(underlyingIn);
     DataOutputStream out = new DataOutputStream(underlyingOut);
 
-    SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
-      callbackHandler);
-
     int magicNumber = in.readInt();
     if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
       throw new InvalidMagicNumberException(magicNumber, 
@@ -365,7 +393,23 @@ public class SaslDataTransferServer {
     }
     try {
       // step 1
-      byte[] remoteResponse = readSaslMessage(in);
+      SaslMessageWithHandshake message = readSaslMessageWithHandshakeSecret(in);
+      byte[] secret = message.getSecret();
+      String bpid = message.getBpid();
+      if (secret != null || bpid != null) {
+        // sanity check, if one is null, the other must also not be null
+        assert(secret != null && bpid != null);
+        String qop = examineSecret(secret, bpid);
+        if (qop != null) {
+          saslProps.put(Sasl.QOP, qop);
+        } else {
+          LOG.error("Unable to match secret to a QOP!");
+        }
+      }
+      SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
+          saslProps, callbackHandler);
+
+      byte[] remoteResponse = message.getPayload();
       byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
       sendSaslMessage(out, localResponse);
 
@@ -379,6 +423,7 @@ public class SaslDataTransferServer {
       checkSaslComplete(sasl, saslProps);
 
       CipherOption cipherOption = null;
+      negotiatedQOP = sasl.getNegotiatedQop();
       if (sasl.isNegotiatedQopPrivacy()) {
         // Negotiate a cipher option
         Configuration conf = dnConf.getConf();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4177e0e..155b800 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -32,6 +32,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_P
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
@@ -89,6 +91,7 @@ public class DNConf {
   final boolean syncOnClose;
   final boolean encryptDataTransfer;
   final boolean connectToDnViaHostname;
+  final boolean overwriteDownstreamDerivedQOP;
 
   final long readaheadLength;
   final long heartBeatInterval;
@@ -239,6 +242,9 @@ public class DNConf {
     this.encryptDataTransfer = getConf().getBoolean(
         DFS_ENCRYPT_DATA_TRANSFER_KEY,
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    this.overwriteDownstreamDerivedQOP = getConf().getBoolean(
+        DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY,
+        DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT);
     this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf());
     this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 3b8006a..55dacd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1781,7 +1781,12 @@ public class DataNode extends ReconfigurableBase
   public int getXferPort() {
     return streamingAddr.getPort();
   }
-  
+
+  @VisibleForTesting
+  public SaslDataTransferServer getSaslServer() {
+    return saslServer;
+  }
+
   /**
    * @return name useful for logging
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 53d650d..55849f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import javax.crypto.SecretKey;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
 import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
@@ -798,8 +800,16 @@ class DataXceiver extends Receiver implements Runnable {
           InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
           DataEncryptionKeyFactory keyFactory =
             datanode.getDataEncryptionKeyFactoryForBlock(block);
-          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
-            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+          SecretKey secretKey = null;
+          if (dnConf.overwriteDownstreamDerivedQOP) {
+            String bpid = block.getBlockPoolId();
+            BlockKey blockKey = datanode.blockPoolTokenSecretManager
+                .get(bpid).getCurrentKey();
+            secretKey = blockKey.getKey();
+          }
+          IOStreamPair saslStreams = datanode.saslClient.socketSend(
+              mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
+              blockToken, targets[0], secretKey);
           unbufMirrorOut = saslStreams.out;
           unbufMirrorIn = saslStreams.in;
           mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 165c804..1091403 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5247,6 +5247,28 @@
   </property>
 
   <property>
+    <name>dfs.encrypt.data.overwrite.downstream.derived.qop</name>
+    <value>false</value>
+    <description>
+      A boolean specifies whether DN should overwrite the downstream
+      QOP in a write pipeline. This is used in the case where client
+      talks to first DN with a QOP, but inter-DN communication needs to be
+      using a different QOP. If set to false, the default behaviour is that
+      inter-DN communication will use the same QOP as client-DN connection.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.encrypt.data.overwrite.downstream.new.qop</name>
+    <value></value>
+    <description>
+      When dfs.datanode.overwrite.downstream.derived.qop is set to true,
+      this configuration specifies the new QOP to be used to overwrite
+      inter-DN QOP.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.namenode.blockreport.queue.size</name>
     <value>1024</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
index 867fbac..45ccefa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
@@ -37,7 +37,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHAAuxiliaryPort {
   @Test
-  public void testTest() throws Exception {
+  public void testHAAuxiliaryPort() throws Exception {
     Configuration conf = new Configuration();
     conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
     conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
new file mode 100644
index 0000000..ca84557
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.URI;
+import java.util.ArrayList;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
+import static org.junit.Assert.*;
+
+
+/**
+ * This test tests access NameNode on different port with different
+ * configured QOP.
+ */
+public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
+
+  private static final Path PATH1  = new Path("/file1");
+  private static final Path PATH2  = new Path("/file2");
+  private static final Path PATH3  = new Path("/file3");
+  private static final int BLOCK_SIZE = 4096;
+  private static final int NUM_BLOCKS = 3;
+
+  private static HdfsConfiguration clusterConf;
+
+  @Before
+  public void setup() throws Exception {
+    clusterConf = createSecureConfig(
+        "authentication,integrity,privacy");
+    clusterConf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY,
+        "12000,12100,12200");
+    // explicitly setting service rpc for datanode. This because
+    // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
+    // and service port at the same time, and if no setting for service
+    // rpc, it would return client port, in this case, it will be the
+    // auxiliary port for data node. Which is not what auxiliary is for.
+    // setting service rpc port to avoid this.
+    clusterConf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
+    clusterConf.set(
+        CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+        "org.apache.hadoop.security.IngressPortBasedResolver");
+    clusterConf.set("ingress.port.sasl.configured.ports", "12000,12100,12200");
+    clusterConf.set("ingress.port.sasl.prop.12000", "authentication");
+    clusterConf.set("ingress.port.sasl.prop.12100", "integrity");
+    clusterConf.set("ingress.port.sasl.prop.12200", "privacy");
+    clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
+  }
+
+  /**
+   * Test accessing NameNode from three different ports.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultipleNNPort() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(clusterConf)
+          .numDataNodes(3).build();
+
+      cluster.waitActive();
+      HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+      clientConf.unset(
+          CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+
+      URI currentURI = cluster.getURI();
+      URI uriAuthPort = new URI(currentURI.getScheme() +
+          "://" + currentURI.getHost() + ":12000");
+      URI uriIntegrityPort = new URI(currentURI.getScheme() +
+          "://" + currentURI.getHost() + ":12100");
+      URI uriPrivacyPort = new URI(currentURI.getScheme() +
+          "://" + currentURI.getHost() + ":12200");
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+      FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
+      doTest(fsPrivacy, PATH1);
+      for (DataNode dn : dataNodes) {
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth-conf", saslServer.getNegotiatedQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+      FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
+      doTest(fsIntegrity, PATH2);
+      for (DataNode dn : dataNodes) {
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth-int", saslServer.getNegotiatedQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+      FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
+      doTest(fsAuth, PATH3);
+      for (DataNode dn : dataNodes) {
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth", saslServer.getNegotiatedQOP());
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test accessing NameNode from three different ports, tests
+   * overwriting downstream DN in the pipeline.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultipleNNPortOverwriteDownStream() throws Exception {
+    clusterConf.set(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY, "auth");
+    clusterConf.setBoolean(
+        DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY, true);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster =
+          new MiniDFSCluster.Builder(clusterConf).numDataNodes(3).build();
+      cluster.waitActive();
+      HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+      clientConf.unset(
+          CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+
+      URI currentURI = cluster.getURI();
+      URI uriAuthPort =
+          new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12000");
+      URI uriIntegrityPort =
+          new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12100");
+      URI uriPrivacyPort =
+          new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12200");
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+      FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
+      doTest(fsPrivacy, PATH1);
+      // add a wait so that data has reached not only first DN,
+      // but also the rest
+      Thread.sleep(100);
+      for (int i = 0; i < 2; i++) {
+        DataNode dn = dataNodes.get(i);
+        SaslDataTransferClient saslClient = dn.getSaslClient();
+        assertEquals("auth", saslClient.getTargetQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+      FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
+      doTest(fsIntegrity, PATH2);
+      Thread.sleep(100);
+      for (int i = 0; i < 2; i++) {
+        DataNode dn = dataNodes.get(i);
+        SaslDataTransferClient saslClient = dn.getSaslClient();
+        assertEquals("auth", saslClient.getTargetQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+      FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
+      doTest(fsAuth, PATH3);
+      Thread.sleep(100);
+      for (int i = 0; i < 3; i++) {
+        DataNode dn = dataNodes.get(i);
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth", saslServer.getNegotiatedQOP());
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void doTest(FileSystem fs, Path path) throws Exception {
+    FileSystemTestHelper.createFile(fs, path, NUM_BLOCKS, BLOCK_SIZE);
+    assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
+        DFSTestUtil.readFile(fs, path).getBytes("UTF-8"));
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0,
+        Long.MAX_VALUE);
+    assertNotNull(blockLocations);
+    assertEquals(NUM_BLOCKS, blockLocations.length);
+    for (BlockLocation blockLocation: blockLocations) {
+      assertNotNull(blockLocation.getHosts());
+      assertEquals(3, blockLocation.getHosts().length);
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
index 0f65269..8a11b0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import com.google.protobuf.ByteString;
+import javax.crypto.SecretKey;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.net.*;
@@ -87,7 +88,7 @@ public class TestDataXceiverBackwardsCompat {
       doReturn(pair).when(saslClient).socketSend(
           any(Socket.class), any(OutputStream.class), any(InputStream.class),
           any(DataEncryptionKeyFactory.class), any(Token.class),
-          any(DatanodeID.class));
+          any(DatanodeID.class), any(SecretKey.class));
       doReturn(mock(ReplicaHandler.class)).when(data).createTemporary(
           any(StorageType.class), any(String.class), any(ExtendedBlock.class),
           anyBoolean());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org