You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/08/16 17:59:31 UTC
[hadoop] 03/04: HDFS-13699. Add DFSClient sending handshake token
to DataNode,
and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit f507bc059d6cc22a2eb04459251f23d8bf89535c
Author: Chen Liang <cl...@apache.org>
AuthorDate: Fri Apr 12 17:37:51 2019 -0700
HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.
---
.../hadoop/security/token/SecretManager.java | 2 +-
.../hadoop/hdfs/client/HdfsClientConfigKeys.java | 3 +
.../datatransfer/sasl/DataTransferSaslUtil.java | 72 +++++++
.../datatransfer/sasl/SaslDataTransferClient.java | 117 +++++++++--
.../src/main/proto/datatransfer.proto | 6 +
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../datatransfer/sasl/SaslDataTransferServer.java | 53 ++++-
.../apache/hadoop/hdfs/server/datanode/DNConf.java | 6 +
.../hadoop/hdfs/server/datanode/DataNode.java | 7 +-
.../hadoop/hdfs/server/datanode/DataXceiver.java | 14 +-
.../src/main/resources/hdfs-default.xml | 22 +++
.../apache/hadoop/hdfs/TestHAAuxiliaryPort.java | 2 +-
.../apache/hadoop/hdfs/TestMultipleNNPortQOP.java | 219 +++++++++++++++++++++
.../datanode/TestDataXceiverBackwardsCompat.java | 3 +-
14 files changed, 502 insertions(+), 28 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
index 798c8c9..514806d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
@@ -167,7 +167,7 @@ public abstract class SecretManager<T extends TokenIdentifier> {
* @param key the secret key
* @return the bytes of the generated password
*/
- protected static byte[] createPassword(byte[] identifier,
+ public static byte[] createPassword(byte[] identifier,
SecretKey key) {
Mac mac = threadLocalMac.get();
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 55b1722..3d934b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -157,6 +157,9 @@ public interface HdfsClientConfigKeys {
String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
"dfs.encrypt.data.transfer.cipher.suites";
+ String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY =
+ "dfs.encrypt.data.overwrite.downstream.new.qop";
+
String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index f4651eb..666a29f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -249,6 +250,51 @@ public final class DataTransferSaslUtil {
}
}
+ static class SaslMessageWithHandshake {
+ private final byte[] payload;
+ private final byte[] secret;
+ private final String bpid;
+
+ SaslMessageWithHandshake(byte[] payload, byte[] secret, String bpid) {
+ this.payload = payload;
+ this.secret = secret;
+ this.bpid = bpid;
+ }
+
+ byte[] getPayload() {
+ return payload;
+ }
+
+ byte[] getSecret() {
+ return secret;
+ }
+
+ String getBpid() {
+ return bpid;
+ }
+ }
+
+ public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
+ InputStream in) throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ byte[] payload = proto.getPayload().toByteArray();
+ byte[] secret = null;
+ String bpid = null;
+ if (proto.hasHandshakeSecret()) {
+ HandshakeSecretProto handshakeSecret = proto.getHandshakeSecret();
+ secret = handshakeSecret.getSecret().toByteArray();
+ bpid = handshakeSecret.getBpid();
+ }
+ return new SaslMessageWithHandshake(payload, secret, bpid);
+ }
+ }
+
/**
* Negotiate a cipher option which server supports.
*
@@ -375,6 +421,12 @@ public final class DataTransferSaslUtil {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
+ public static void sendSaslMessageHandshakeSecret(OutputStream out,
+ byte[] payload, byte[] secret, String bpid) throws IOException {
+ sendSaslMessageHandshakeSecret(out, DataTransferEncryptorStatus.SUCCESS,
+ payload, null, secret, bpid);
+ }
+
/**
* Send a SASL negotiation message and negotiation cipher options to server.
*
@@ -497,6 +549,13 @@ public final class DataTransferSaslUtil {
public static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message)
throws IOException {
+ sendSaslMessage(out, status, payload, message, null);
+ }
+
+ public static void sendSaslMessage(OutputStream out,
+ DataTransferEncryptorStatus status, byte[] payload, String message,
+ HandshakeSecretProto handshakeSecret)
+ throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
@@ -507,12 +566,25 @@ public final class DataTransferSaslUtil {
if (message != null) {
builder.setMessage(message);
}
+ if (handshakeSecret != null) {
+ builder.setHandshakeSecret(handshakeSecret);
+ }
DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
out.flush();
}
+ public static void sendSaslMessageHandshakeSecret(OutputStream out,
+ DataTransferEncryptorStatus status, byte[] payload, String message,
+ byte[] secret, String bpid) throws IOException {
+ HandshakeSecretProto.Builder builder =
+ HandshakeSecretProto.newBuilder();
+ builder.setSecret(ByteString.copyFrom(secret));
+ builder.setBpid(bpid);
+ sendSaslMessage(out, status, payload, message, builder.build());
+ }
+
/**
* There is no reason to instantiate this class.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index a23a108..8d1c7f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.crypto.SecretKey;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -39,6 +43,7 @@ import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -54,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +89,10 @@ public class SaslDataTransferClient {
private final SaslPropertiesResolver saslPropsResolver;
private final TrustedChannelResolver trustedChannelResolver;
+ // Store the most recent successfully negotiated QOP,
+ // for testing purpose only
+ private String targetQOP;
+
/**
* Creates a new SaslDataTransferClient. This constructor is used in cases
* where it is not relevant to track if a secure client did a fallback to
@@ -140,7 +150,7 @@ public class SaslDataTransferClient {
DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
encryptionKeyFactory.newDataEncryptionKey() : null;
IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
- underlyingIn, encryptionKey, accessToken, datanodeId);
+ underlyingIn, encryptionKey, accessToken, datanodeId, null);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
@@ -180,8 +190,19 @@ public class SaslDataTransferClient {
InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
+ return socketSend(socket, underlyingOut, underlyingIn, encryptionKeyFactory,
+ accessToken, datanodeId, null);
+ }
+
+ public IOStreamPair socketSend(
+ Socket socket, OutputStream underlyingOut, InputStream underlyingIn,
+ DataEncryptionKeyFactory encryptionKeyFactory,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+ SecretKey secretKey)
+ throws IOException {
IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
- underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
+ underlyingIn, encryptionKeyFactory, accessToken, datanodeId,
+ secretKey);
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
}
@@ -203,17 +224,26 @@ public class SaslDataTransferClient {
DataEncryptionKeyFactory encryptionKeyFactory,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
+ return checkTrustAndSend(addr, underlyingOut, underlyingIn,
+ encryptionKeyFactory, accessToken, datanodeId, null);
+ }
+
+ private IOStreamPair checkTrustAndSend(
+ InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn,
+ DataEncryptionKeyFactory encryptionKeyFactory,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+ SecretKey secretKey)
+ throws IOException {
boolean localTrusted = trustedChannelResolver.isTrusted();
boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
- LOG.debug("SASL encryption trust check: localHostTrusted = {}, "
+ LOG.info("SASL encryption trust check: localHostTrusted = {}, "
+ "remoteHostTrusted = {}", localTrusted, remoteTrusted);
-
if (!localTrusted || !remoteTrusted) {
// The encryption key factory only returns a key if encryption is enabled.
- DataEncryptionKey encryptionKey = encryptionKeyFactory
- .newDataEncryptionKey();
+ DataEncryptionKey encryptionKey =
+ encryptionKeyFactory.newDataEncryptionKey();
return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
- datanodeId);
+ datanodeId, secretKey);
} else {
LOG.debug(
"SASL client skipping handshake on trusted connection for addr = {}, "
@@ -237,13 +267,14 @@ public class SaslDataTransferClient {
*/
private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
InputStream underlyingIn, DataEncryptionKey encryptionKey,
- Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+ SecretKey secretKey)
throws IOException {
if (encryptionKey != null) {
LOG.debug("SASL client doing encrypted handshake for addr = {}, "
+ "datanodeId = {}", addr, datanodeId);
return getEncryptedStreams(addr, underlyingOut, underlyingIn,
- encryptionKey);
+ encryptionKey, accessToken, secretKey);
} else if (!UserGroupInformation.isSecurityEnabled()) {
LOG.debug("SASL client skipping handshake in unsecured configuration for "
+ "addr = {}, datanodeId = {}", addr, datanodeId);
@@ -264,7 +295,8 @@ public class SaslDataTransferClient {
LOG.debug(
"SASL client doing general handshake for addr = {}, datanodeId = {}",
addr, datanodeId);
- return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken);
+ return getSaslStreams(addr, underlyingOut, underlyingIn,
+ accessToken, secretKey);
} else {
// It's a secured cluster using non-privileged ports, but no SASL. The
// only way this can happen is if the DataNode has
@@ -287,11 +319,20 @@ public class SaslDataTransferClient {
* @throws IOException for any error
*/
private IOStreamPair getEncryptedStreams(InetAddress addr,
- OutputStream underlyingOut,
- InputStream underlyingIn, DataEncryptionKey encryptionKey)
+ OutputStream underlyingOut, InputStream underlyingIn,
+ DataEncryptionKey encryptionKey,
+ Token<BlockTokenIdentifier> accessToken,
+ SecretKey secretKey)
throws IOException {
Map<String, String> saslProps = createSaslPropertiesForEncryption(
encryptionKey.encryptionAlgorithm);
+ if (secretKey != null) {
+ LOG.debug("DataNode overwriting downstream QOP" +
+ saslProps.get(Sasl.QOP));
+ byte[] newSecret = SecretManager.createPassword(saslProps.get(Sasl.QOP)
+ .getBytes(Charsets.UTF_8), secretKey);
+ accessToken.setDNHandshakeSecret(newSecret);
+ }
LOG.debug("Client using encryption algorithm {}",
encryptionKey.encryptionAlgorithm);
@@ -301,7 +342,7 @@ public class SaslDataTransferClient {
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
- saslProps, callbackHandler);
+ saslProps, callbackHandler, accessToken);
}
/**
@@ -370,6 +411,11 @@ public class SaslDataTransferClient {
}
}
+ @VisibleForTesting
+ public String getTargetQOP() {
+ return targetQOP;
+ }
+
/**
* Sends client SASL negotiation for general-purpose handshake.
*
@@ -382,16 +428,36 @@ public class SaslDataTransferClient {
*/
private IOStreamPair getSaslStreams(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn,
- Token<BlockTokenIdentifier> accessToken)
+ Token<BlockTokenIdentifier> accessToken,
+ SecretKey secretKey)
throws IOException {
Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
+ // secretKey != null only happens when this is called by DN
+ // sending to downstream DN. If called from client, this will be null,
+ // as there is no key for client to generate mac instance.
+ // So that, if a different QOP is desired for inter-DN communication,
+ // the check below will use new QOP to create a secret, which includes
+ // the new QOP.
+ if (secretKey != null) {
+ String newQOP = conf
+ .get(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY);
+ if (newQOP != null) {
+ saslProps.put(Sasl.QOP, newQOP);
+ }
+ LOG.debug("DataNode overwriting downstream QOP " +
+ saslProps.get(Sasl.QOP));
+ byte[] newSecret = SecretManager.createPassword(
+ saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
+ accessToken.setDNHandshakeSecret(newSecret);
+ }
+ targetQOP = saslProps.get(Sasl.QOP);
String userName = buildUserName(accessToken);
char[] password = buildClientPassword(accessToken);
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
password);
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
- saslProps, callbackHandler);
+ saslProps, callbackHandler, accessToken);
}
/**
@@ -435,8 +501,8 @@ public class SaslDataTransferClient {
*/
private IOStreamPair doSaslHandshake(InetAddress addr,
OutputStream underlyingOut, InputStream underlyingIn, String userName,
- Map<String, String> saslProps,
- CallbackHandler callbackHandler) throws IOException {
+ Map<String, String> saslProps, CallbackHandler callbackHandler,
+ Token<BlockTokenIdentifier> accessToken) throws IOException {
DataOutputStream out = new DataOutputStream(underlyingOut);
DataInputStream in = new DataInputStream(underlyingIn);
@@ -449,7 +515,22 @@ public class SaslDataTransferClient {
try {
// Start of handshake - "initial response" in SASL terminology.
- sendSaslMessage(out, new byte[0]);
+ // The handshake secret can be null, this happens when client is running
+ // a new version but the cluster does not have this feature. In which case
+ // there will be no encrypted secret sent from NN.
+ byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
+ if (handshakeSecret == null || handshakeSecret.length == 0) {
+ LOG.debug("Handshake secret is null, sending without "
+ + "handshake secret.");
+ sendSaslMessage(out, new byte[0]);
+ } else {
+ LOG.debug("Sending handshake secret.");
+ BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+ identifier.readFields(new DataInputStream(
+ new ByteArrayInputStream(accessToken.getIdentifier())));
+ String bpid = identifier.getBlockPoolId();
+ sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
+ }
// step 1
byte[] remoteResponse = readSaslMessage(in);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 384da54..43a03e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -43,6 +43,12 @@ message DataTransferEncryptorMessageProto {
optional bytes payload = 2;
optional string message = 3;
repeated CipherOptionProto cipherOption = 4;
+ optional HandshakeSecretProto handshakeSecret = 5;
+}
+
+message HandshakeSecretProto {
+ required bytes secret = 1;
+ required string bpid = 2;
}
message BaseHeaderProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b05c08d..7466021 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1006,6 +1006,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Security-related configs
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
+ public static final String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY =
+ "dfs.encrypt.data.overwrite.downstream.derived.qop";
+ public static final boolean DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT =
+ false;
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_XFRAME_OPTION_ENABLED = "dfs.xframe.enabled";
public static final boolean DFS_XFRAME_OPTION_ENABLED_DEFAULT = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index e3a72d0..d162d9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -21,15 +21,18 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
+import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import javax.crypto.SecretKey;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -37,6 +40,7 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
@@ -48,12 +52,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +85,10 @@ public class SaslDataTransferServer {
private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private final DNConf dnConf;
+ // Store the most recent successfully negotiated QOP,
+ // for testing purpose only
+ private String negotiatedQOP;
+
/**
* Creates a new SaslDataTransferServer.
*
@@ -337,6 +348,26 @@ public class SaslDataTransferServer {
return identifier;
}
+ private String examineSecret(byte[] secret, String bpid) {
+ BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
+ SecretKey secretKey = blockKey.getKey();
+ for (SaslRpcServer.QualityOfProtection qop :
+ SaslRpcServer.QualityOfProtection.values()) {
+ String qopString = qop.getSaslQop();
+ byte[] data = qopString.getBytes(Charsets.UTF_8);
+ byte[] encryptedData = SecretManager.createPassword(data, secretKey);
+ if (Arrays.equals(encryptedData, secret)) {
+ return qopString;
+ }
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ public String getNegotiatedQOP() {
+ return negotiatedQOP;
+ }
+
/**
* This method actually executes the server-side SASL handshake.
*
@@ -355,9 +386,6 @@ public class SaslDataTransferServer {
DataInputStream in = new DataInputStream(underlyingIn);
DataOutputStream out = new DataOutputStream(underlyingOut);
- SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
- callbackHandler);
-
int magicNumber = in.readInt();
if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
throw new InvalidMagicNumberException(magicNumber,
@@ -365,7 +393,23 @@ public class SaslDataTransferServer {
}
try {
// step 1
- byte[] remoteResponse = readSaslMessage(in);
+ SaslMessageWithHandshake message = readSaslMessageWithHandshakeSecret(in);
+ byte[] secret = message.getSecret();
+ String bpid = message.getBpid();
+ if (secret != null || bpid != null) {
+ // sanity check, if one is null, the other must also not be null
+ assert(secret != null && bpid != null);
+ String qop = examineSecret(secret, bpid);
+ if (qop != null) {
+ saslProps.put(Sasl.QOP, qop);
+ } else {
+ LOG.error("Unable to match secret to a QOP!");
+ }
+ }
+ SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
+ saslProps, callbackHandler);
+
+ byte[] remoteResponse = message.getPayload();
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
@@ -379,6 +423,7 @@ public class SaslDataTransferServer {
checkSaslComplete(sasl, saslProps);
CipherOption cipherOption = null;
+ negotiatedQOP = sasl.getNegotiatedQop();
if (sasl.isNegotiatedQopPrivacy()) {
// Negotiate a cipher option
Configuration conf = dnConf.getConf();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4177e0e..155b800 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -32,6 +32,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_P
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
@@ -89,6 +91,7 @@ public class DNConf {
final boolean syncOnClose;
final boolean encryptDataTransfer;
final boolean connectToDnViaHostname;
+ final boolean overwriteDownstreamDerivedQOP;
final long readaheadLength;
final long heartBeatInterval;
@@ -239,6 +242,9 @@ public class DNConf {
this.encryptDataTransfer = getConf().getBoolean(
DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+ this.overwriteDownstreamDerivedQOP = getConf().getBoolean(
+ DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY,
+ DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT);
this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf());
this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 3b8006a..55dacd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1781,7 +1781,12 @@ public class DataNode extends ReconfigurableBase
public int getXferPort() {
return streamingAddr.getPort();
}
-
+
+ @VisibleForTesting
+ public SaslDataTransferServer getSaslServer() {
+ return saslServer;
+ }
+
/**
* @return name useful for logging
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 53d650d..55849f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
@@ -798,8 +800,16 @@ class DataXceiver extends Receiver implements Runnable {
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
- IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
- unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+ SecretKey secretKey = null;
+ if (dnConf.overwriteDownstreamDerivedQOP) {
+ String bpid = block.getBlockPoolId();
+ BlockKey blockKey = datanode.blockPoolTokenSecretManager
+ .get(bpid).getCurrentKey();
+ secretKey = blockKey.getKey();
+ }
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(
+ mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
+ blockToken, targets[0], secretKey);
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 165c804..1091403 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5247,6 +5247,28 @@
</property>
<property>
+ <name>dfs.encrypt.data.overwrite.downstream.derived.qop</name>
+ <value>false</value>
+ <description>
+ A boolean specifies whether DN should overwrite the downstream
+ QOP in a write pipeline. This is used in the case where client
+ talks to first DN with a QOP, but inter-DN communication needs to be
+ using a different QOP. If set to false, the default behaviour is that
+ inter-DN communication will use the same QOP as client-DN connection.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.encrypt.data.overwrite.downstream.new.qop</name>
+ <value></value>
+ <description>
+ When dfs.datanode.overwrite.downstream.derived.qop is set to true,
+ this configuration specifies the new QOP to be used to overwrite
+ inter-DN QOP.
+ </description>
+ </property>
+
+ <property>
<name>dfs.namenode.blockreport.queue.size</name>
<value>1024</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
index 867fbac..45ccefa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
@@ -37,7 +37,7 @@ import static org.junit.Assert.assertTrue;
*/
public class TestHAAuxiliaryPort {
@Test
- public void testTest() throws Exception {
+ public void testHAAuxiliaryPort() throws Exception {
Configuration conf = new Configuration();
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
new file mode 100644
index 0000000..ca84557
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.URI;
+import java.util.ArrayList;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
+import static org.junit.Assert.*;
+
+
+/**
+ * This test tests access NameNode on different port with different
+ * configured QOP.
+ */
+public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
+
+ private static final Path PATH1 = new Path("/file1");
+ private static final Path PATH2 = new Path("/file2");
+ private static final Path PATH3 = new Path("/file3");
+ private static final int BLOCK_SIZE = 4096;
+ private static final int NUM_BLOCKS = 3;
+
+ private static HdfsConfiguration clusterConf;
+
+ @Before
+ public void setup() throws Exception {
+ clusterConf = createSecureConfig(
+ "authentication,integrity,privacy");
+ clusterConf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY,
+ "12000,12100,12200");
+ // explicitly setting service rpc for datanode. This because
+ // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
+ // and service port at the same time, and if no setting for service
+ // rpc, it would return client port, in this case, it will be the
+ // auxiliary port for data node. Which is not what auxiliary is for.
+ // setting service rpc port to avoid this.
+ clusterConf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
+ clusterConf.set(
+ CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+ "org.apache.hadoop.security.IngressPortBasedResolver");
+ clusterConf.set("ingress.port.sasl.configured.ports", "12000,12100,12200");
+ clusterConf.set("ingress.port.sasl.prop.12000", "authentication");
+ clusterConf.set("ingress.port.sasl.prop.12100", "integrity");
+ clusterConf.set("ingress.port.sasl.prop.12200", "privacy");
+ clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
+ }
+
+ /**
+ * Test accessing NameNode from three different ports.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultipleNNPort() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(clusterConf)
+ .numDataNodes(3).build();
+
+ cluster.waitActive();
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.unset(
+ CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+ ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+
+ URI currentURI = cluster.getURI();
+ URI uriAuthPort = new URI(currentURI.getScheme() +
+ "://" + currentURI.getHost() + ":12000");
+ URI uriIntegrityPort = new URI(currentURI.getScheme() +
+ "://" + currentURI.getHost() + ":12100");
+ URI uriPrivacyPort = new URI(currentURI.getScheme() +
+ "://" + currentURI.getHost() + ":12200");
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+ FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
+ doTest(fsPrivacy, PATH1);
+ for (DataNode dn : dataNodes) {
+ SaslDataTransferServer saslServer = dn.getSaslServer();
+ assertEquals("auth-conf", saslServer.getNegotiatedQOP());
+ }
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+ FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
+ doTest(fsIntegrity, PATH2);
+ for (DataNode dn : dataNodes) {
+ SaslDataTransferServer saslServer = dn.getSaslServer();
+ assertEquals("auth-int", saslServer.getNegotiatedQOP());
+ }
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+ FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
+ doTest(fsAuth, PATH3);
+ for (DataNode dn : dataNodes) {
+ SaslDataTransferServer saslServer = dn.getSaslServer();
+ assertEquals("auth", saslServer.getNegotiatedQOP());
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test accessing NameNode from three different ports, tests
+ * overwriting downstream DN in the pipeline.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultipleNNPortOverwriteDownStream() throws Exception {
+ clusterConf.set(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY, "auth");
+ clusterConf.setBoolean(
+ DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY, true);
+ MiniDFSCluster cluster = null;
+ try {
+ cluster =
+ new MiniDFSCluster.Builder(clusterConf).numDataNodes(3).build();
+ cluster.waitActive();
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.unset(
+ CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+ ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+
+ URI currentURI = cluster.getURI();
+ URI uriAuthPort =
+ new URI(currentURI.getScheme() + "://" +
+ currentURI.getHost() + ":12000");
+ URI uriIntegrityPort =
+ new URI(currentURI.getScheme() + "://" +
+ currentURI.getHost() + ":12100");
+ URI uriPrivacyPort =
+ new URI(currentURI.getScheme() + "://" +
+ currentURI.getHost() + ":12200");
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+ FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
+ doTest(fsPrivacy, PATH1);
+ // add a wait so that data has reached not only first DN,
+ // but also the rest
+ Thread.sleep(100);
+ for (int i = 0; i < 2; i++) {
+ DataNode dn = dataNodes.get(i);
+ SaslDataTransferClient saslClient = dn.getSaslClient();
+ assertEquals("auth", saslClient.getTargetQOP());
+ }
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+ FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
+ doTest(fsIntegrity, PATH2);
+ Thread.sleep(100);
+ for (int i = 0; i < 2; i++) {
+ DataNode dn = dataNodes.get(i);
+ SaslDataTransferClient saslClient = dn.getSaslClient();
+ assertEquals("auth", saslClient.getTargetQOP());
+ }
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+ FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
+ doTest(fsAuth, PATH3);
+ Thread.sleep(100);
+ for (int i = 0; i < 3; i++) {
+ DataNode dn = dataNodes.get(i);
+ SaslDataTransferServer saslServer = dn.getSaslServer();
+ assertEquals("auth", saslServer.getNegotiatedQOP());
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void doTest(FileSystem fs, Path path) throws Exception {
+ FileSystemTestHelper.createFile(fs, path, NUM_BLOCKS, BLOCK_SIZE);
+ assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
+ DFSTestUtil.readFile(fs, path).getBytes("UTF-8"));
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0,
+ Long.MAX_VALUE);
+ assertNotNull(blockLocations);
+ assertEquals(NUM_BLOCKS, blockLocations.length);
+ for (BlockLocation blockLocation: blockLocations) {
+ assertNotNull(blockLocation.getHosts());
+ assertEquals(3, blockLocation.getHosts().length);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
index 0f65269..8a11b0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import com.google.protobuf.ByteString;
+import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.net.*;
@@ -87,7 +88,7 @@ public class TestDataXceiverBackwardsCompat {
doReturn(pair).when(saslClient).socketSend(
any(Socket.class), any(OutputStream.class), any(InputStream.class),
any(DataEncryptionKeyFactory.class), any(Token.class),
- any(DatanodeID.class));
+ any(DatanodeID.class), any(SecretKey.class));
doReturn(mock(ReplicaHandler.class)).when(data).createTemporary(
any(StorageType.class), any(String.class), any(ExtendedBlock.class),
anyBoolean());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org