You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yl...@apache.org on 2014/10/28 14:19:15 UTC
git commit: HDFS-6606. Optimize HDFS Encrypted Transport performance.
(yliu)
Repository: hadoop
Updated Branches:
refs/heads/trunk c9bec46c9 -> 58c0bb9ed
HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58c0bb9e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58c0bb9e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58c0bb9e
Branch: refs/heads/trunk
Commit: 58c0bb9ed9f4a2491395b63c68046562a73526c9
Parents: c9bec46
Author: yliu <yl...@apache.org>
Authored: Tue Oct 28 21:11:31 2014 +0800
Committer: yliu <yl...@apache.org>
Committed: Tue Oct 28 21:11:31 2014 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/crypto/CipherOption.java | 66 +++++
.../apache/hadoop/crypto/CryptoInputStream.java | 45 +++-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 2 +-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../datatransfer/sasl/DataTransferSaslUtil.java | 266 ++++++++++++++++++-
.../sasl/SaslDataTransferClient.java | 47 +++-
.../sasl/SaslDataTransferServer.java | 34 ++-
.../datatransfer/sasl/SaslParticipant.java | 44 +++
.../SaslResponseWithNegotiatedCipherOption.java | 33 +++
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 79 ++++++
.../hadoop/hdfs/server/balancer/Dispatcher.java | 2 +-
.../hadoop/hdfs/server/datanode/DNConf.java | 11 +
.../hadoop/hdfs/server/datanode/DataNode.java | 4 +-
.../src/main/proto/datatransfer.proto | 1 +
.../hadoop-hdfs/src/main/proto/hdfs.proto | 11 +
.../src/main/resources/hdfs-default.xml | 12 +
.../hadoop/hdfs/TestEncryptedTransfer.java | 52 +++-
18 files changed, 671 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java
new file mode 100644
index 0000000..6a8d8d0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used between client and server to negotiate the
+ * cipher suite, key and iv.
+ */
+@InterfaceAudience.Private
+public class CipherOption {
+ private final CipherSuite suite;
+ private final byte[] inKey;
+ private final byte[] inIv;
+ private final byte[] outKey;
+ private final byte[] outIv;
+
+ public CipherOption(CipherSuite suite) {
+ this(suite, null, null, null, null);
+ }
+
+ public CipherOption(CipherSuite suite, byte[] inKey, byte[] inIv,
+ byte[] outKey, byte[] outIv) {
+ this.suite = suite;
+ this.inKey = inKey;
+ this.inIv = inIv;
+ this.outKey = outKey;
+ this.outIv = outIv;
+ }
+
+ public CipherSuite getCipherSuite() {
+ return suite;
+ }
+
+ public byte[] getInKey() {
+ return inKey;
+ }
+
+ public byte[] getInIv() {
+ return inIv;
+ }
+
+ public byte[] getOutKey() {
+ return outKey;
+ }
+
+ public byte[] getOutIv() {
+ return outIv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index 68e9697..4b53563 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -23,6 +23,7 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException;
import java.util.EnumSet;
import java.util.Queue;
@@ -57,7 +58,8 @@ import com.google.common.base.Preconditions;
@InterfaceStability.Evolving
public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
- CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+ CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
+ ReadableByteChannel {
private static final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
@@ -92,6 +94,8 @@ public class CryptoInputStream extends FilterInputStream implements
private final byte[] key;
private final byte[] initIV;
private byte[] iv;
+ private final boolean isByteBufferReadable;
+ private final boolean isReadableByteChannel;
/** DirectBuffer pool */
private final Queue<ByteBuffer> bufferPool =
@@ -115,6 +119,8 @@ public class CryptoInputStream extends FilterInputStream implements
this.initIV = iv.clone();
this.iv = iv.clone();
this.streamOffset = streamOffset;
+ isByteBufferReadable = in instanceof ByteBufferReadable;
+ isReadableByteChannel = in instanceof ReadableByteChannel;
inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
decryptor = getDecryptor();
@@ -165,9 +171,11 @@ public class CryptoInputStream extends FilterInputStream implements
* it can avoid bytes copy.
*/
if (usingByteBufferRead == null) {
- if (in instanceof ByteBufferReadable) {
+ if (isByteBufferReadable || isReadableByteChannel) {
try {
- n = ((ByteBufferReadable) in).read(inBuffer);
+ n = isByteBufferReadable ?
+ ((ByteBufferReadable) in).read(inBuffer) :
+ ((ReadableByteChannel) in).read(inBuffer);
usingByteBufferRead = Boolean.TRUE;
} catch (UnsupportedOperationException e) {
usingByteBufferRead = Boolean.FALSE;
@@ -180,7 +188,8 @@ public class CryptoInputStream extends FilterInputStream implements
}
} else {
if (usingByteBufferRead) {
- n = ((ByteBufferReadable) in).read(inBuffer);
+ n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) :
+ ((ReadableByteChannel) in).read(inBuffer);
} else {
n = readFromUnderlyingStream(inBuffer);
}
@@ -450,7 +459,7 @@ public class CryptoInputStream extends FilterInputStream implements
@Override
public int read(ByteBuffer buf) throws IOException {
checkStream();
- if (in instanceof ByteBufferReadable) {
+ if (isByteBufferReadable || isReadableByteChannel) {
final int unread = outBuffer.remaining();
if (unread > 0) { // Have unread decrypted data in buffer.
int toRead = buf.remaining();
@@ -466,7 +475,8 @@ public class CryptoInputStream extends FilterInputStream implements
}
final int pos = buf.position();
- final int n = ((ByteBufferReadable) in).read(buf);
+ final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) :
+ ((ReadableByteChannel) in).read(buf);
if (n > 0) {
streamOffset += n; // Read n bytes
decrypt(buf, n, pos);
@@ -481,10 +491,22 @@ public class CryptoInputStream extends FilterInputStream implements
return unread;
}
}
+ } else {
+ int n = 0;
+ if (buf.hasArray()) {
+ n = read(buf.array(), buf.position(), buf.remaining());
+ if (n > 0) {
+ buf.position(buf.position() + n);
+ }
+ } else {
+ byte[] tmp = new byte[buf.remaining()];
+ n = read(tmp);
+ if (n > 0) {
+ buf.put(tmp, 0, n);
+ }
+ }
+ return n;
}
-
- throw new UnsupportedOperationException("ByteBuffer read unsupported " +
- "by input stream.");
}
/**
@@ -686,4 +708,9 @@ public class CryptoInputStream extends FilterInputStream implements
decryptorPool.add(decryptor);
}
}
+
+ @Override
+ public boolean isOpen() {
+ return !closed;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e18b935..13b2e9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -663,6 +663,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7122. Use of ThreadLocal<Random> results in poor block placement.
(wang)
+ HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)
+
BUG FIXES
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 97ffdde..5ba6273 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -686,7 +686,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.initThreadsNumForHedgedReads(numThreads);
}
this.saslClient = new SaslDataTransferClient(
- DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5d7d252..59eaa20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -600,6 +600,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Security-related configs
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
+ public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength";
+ public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index 81d740f..2d5e13c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROT
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.IOException;
@@ -28,6 +30,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.sasl.Sasl;
@@ -35,10 +38,18 @@ import javax.security.sasl.Sasl;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.slf4j.Logger;
@@ -95,6 +106,19 @@ public final class DataTransferSaslUtil {
"requested = %s, negotiated = %s", requestedQop, negotiatedQop));
}
}
+
+ /**
+ * Check whether requested SASL Qop contains privacy.
+ *
+ * @param saslProps properties of SASL negotiation
+ * @return boolean true if privacy exists
+ */
+ public static boolean requestedQopContainsPrivacy(
+ Map<String, String> saslProps) {
+ Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
+ saslProps.get(Sasl.QOP).split(",")));
+ return requestedQop.contains("auth-conf");
+ }
/**
* Creates SASL properties required for an encrypted SASL negotiation.
@@ -177,20 +201,6 @@ public final class DataTransferSaslUtil {
}
/**
- * Performs the first step of SASL negotiation.
- *
- * @param out connection output stream
- * @param in connection input stream
- * @param sasl participant
- */
- public static void performSaslStep1(OutputStream out, InputStream in,
- SaslParticipant sasl) throws IOException {
- byte[] remoteResponse = readSaslMessage(in);
- byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
- sendSaslMessage(out, localResponse);
- }
-
- /**
* Reads a SASL negotiation message.
*
* @param in stream to read
@@ -208,6 +218,124 @@ public final class DataTransferSaslUtil {
return proto.getPayload().toByteArray();
}
}
+
+ /**
+ * Reads a SASL negotiation message and negotiation cipher options.
+ *
+ * @param in stream to read
+ * @param cipherOptions list to store negotiation cipher options
+ * @return byte[] SASL negotiation message
+ * @throws IOException for any error
+ */
+ public static byte[] readSaslMessageAndNegotiationCipherOptions(
+ InputStream in, List<CipherOption> cipherOptions) throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
+ if (optionProtos != null) {
+ for (CipherOptionProto optionProto : optionProtos) {
+ cipherOptions.add(PBHelper.convert(optionProto));
+ }
+ }
+ return proto.getPayload().toByteArray();
+ }
+ }
+
+ /**
+ * Negotiate a cipher option which server supports.
+ *
+ * @param options the cipher options which client supports
+ * @return CipherOption negotiated cipher option
+ */
+ public static CipherOption negotiateCipherOption(Configuration conf,
+ List<CipherOption> options) {
+ if (options != null) {
+ for (CipherOption option : options) {
+ // Currently we support AES/CTR/NoPadding
+ CipherSuite suite = option.getCipherSuite();
+ if (suite == CipherSuite.AES_CTR_NOPADDING) {
+ int keyLen = conf.getInt(
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY,
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8;
+ CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
+ byte[] inKey = new byte[keyLen];
+ byte[] inIv = new byte[suite.getAlgorithmBlockSize()];
+ byte[] outKey = new byte[keyLen];
+ byte[] outIv = new byte[suite.getAlgorithmBlockSize()];
+ codec.generateSecureRandom(inKey);
+ codec.generateSecureRandom(inIv);
+ codec.generateSecureRandom(outKey);
+ codec.generateSecureRandom(outIv);
+ return new CipherOption(suite, inKey, inIv, outKey, outIv);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Send SASL message and negotiated cipher option to client.
+ *
+ * @param out stream to receive message
+ * @param payload to send
+ * @param option negotiated cipher option
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessageAndNegotiatedCipherOption(
+ OutputStream out, byte[] payload, CipherOption option)
+ throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+
+ builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+ if (payload != null) {
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (option != null) {
+ builder.addCipherOption(PBHelper.convert(option));
+ }
+
+ DataTransferEncryptorMessageProto proto = builder.build();
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ /**
+ * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream}
+ * and {@link org.apache.hadoop.crypto.CryptoOutputStream}
+ *
+ * @param conf the configuration
+ * @param cipherOption negotiated cipher option
+ * @param out underlying output stream
+ * @param in underlying input stream
+ * @param isServer is server side
+ * @return IOStreamPair the stream pair
+ * @throws IOException for any error
+ */
+ public static IOStreamPair createStreamPair(Configuration conf,
+ CipherOption cipherOption, OutputStream out, InputStream in,
+ boolean isServer) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
+ "CryptoOutputStream.");
+ }
+ CryptoCodec codec = CryptoCodec.getInstance(conf,
+ cipherOption.getCipherSuite());
+ byte[] inKey = cipherOption.getInKey();
+ byte[] inIv = cipherOption.getInIv();
+ byte[] outKey = cipherOption.getOutKey();
+ byte[] outIv = cipherOption.getOutIv();
+ InputStream cIn = new CryptoInputStream(in, codec,
+ isServer ? inKey : outKey, isServer ? inIv : outIv);
+ OutputStream cOut = new CryptoOutputStream(out, codec,
+ isServer ? outKey : inKey, isServer ? outIv : inIv);
+ return new IOStreamPair(cIn, cOut);
+ }
/**
* Sends a SASL negotiation message indicating an error.
@@ -232,6 +360,116 @@ public final class DataTransferSaslUtil {
throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
}
+
+ /**
+ * Send a SASL negotiation message and negotiation cipher options to server.
+ *
+ * @param out stream to receive message
+ * @param payload to send
+ * @param options cipher options to negotiate
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessageAndNegotiationCipherOptions(
+ OutputStream out, byte[] payload, List<CipherOption> options)
+ throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+
+ builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+ if (payload != null) {
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (options != null) {
+ builder.addAllCipherOption(PBHelper.convertCipherOptions(options));
+ }
+
+ DataTransferEncryptorMessageProto proto = builder.build();
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ /**
+ * Read SASL message and negotiated cipher option from server.
+ *
+ * @param in stream to read
+ * @return SaslResponseWithNegotiatedCipherOption SASL message and
+ * negotiated cipher option
+ * @throws IOException for any error
+ */
+ public static SaslResponseWithNegotiatedCipherOption
+ readSaslMessageAndNegotiatedCipherOption(InputStream in)
+ throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ byte[] response = proto.getPayload().toByteArray();
+ List<CipherOption> options = PBHelper.convertCipherOptionProtos(
+ proto.getCipherOptionList());
+ CipherOption option = null;
+ if (options != null && !options.isEmpty()) {
+ option = options.get(0);
+ }
+ return new SaslResponseWithNegotiatedCipherOption(response, option);
+ }
+ }
+
+ /**
+ * Encrypt the key and iv of the negotiated cipher option.
+ *
+ * @param option negotiated cipher option
+ * @param sasl SASL participant representing server
+ * @return CipherOption negotiated cipher option which contains the
+ * encrypted key and iv
+ * @throws IOException for any error
+ */
+ public static CipherOption wrap(CipherOption option, SaslParticipant sasl)
+ throws IOException {
+ if (option != null) {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = sasl.wrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = sasl.wrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
+ outKey, option.getOutIv());
+ }
+
+ return null;
+ }
+
+ /**
+ * Decrypt the key and iv of the negotiated cipher option.
+ *
+ * @param option negotiated cipher option
+ * @param sasl SASL participant representing client
+ * @return CipherOption negotiated cipher option which contains the
+ * decrypted key and iv
+ * @throws IOException for any error
+ */
+ public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
+ throws IOException {
+ if (option != null) {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = sasl.unwrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = sasl.unwrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
+ outKey, option.getOutIv());
+ }
+
+ return null;
+ }
/**
* Sends a SASL negotiation message.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index 9df9929..cfcc91f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import java.io.DataInputStream;
@@ -27,6 +26,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +40,9 @@ import javax.security.sasl.RealmChoiceCallback;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.hdfs.net.EncryptedPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -54,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a client. There are
@@ -72,6 +76,7 @@ public class SaslDataTransferClient {
private static final Logger LOG = LoggerFactory.getLogger(
SaslDataTransferClient.class);
+ private final Configuration conf;
private final AtomicBoolean fallbackToSimpleAuth;
private final SaslPropertiesResolver saslPropsResolver;
private final TrustedChannelResolver trustedChannelResolver;
@@ -82,27 +87,32 @@ public class SaslDataTransferClient {
* simple auth. For intra-cluster connections between data nodes in the same
* cluster, we can assume that all run under the same security configuration.
*
+ * @param conf the configuration
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
*/
- public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+ public SaslDataTransferClient(Configuration conf,
+ SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver) {
- this(saslPropsResolver, trustedChannelResolver, null);
+ this(conf, saslPropsResolver, trustedChannelResolver, null);
}
/**
* Creates a new SaslDataTransferClient.
*
+ * @param conf the configuration
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
* @param fallbackToSimpleAuth checked on each attempt at general SASL
* handshake, if true forces use of simple auth
*/
- public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+ public SaslDataTransferClient(Configuration conf,
+ SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver,
AtomicBoolean fallbackToSimpleAuth) {
+ this.conf = conf;
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.saslPropsResolver = saslPropsResolver;
this.trustedChannelResolver = trustedChannelResolver;
@@ -436,17 +446,38 @@ public class SaslDataTransferClient {
sendSaslMessage(out, new byte[0]);
// step 1
- performSaslStep1(out, in, sasl);
-
- // step 2 (client-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ List<CipherOption> cipherOptions = null;
+ if (requestedQopContainsPrivacy(saslProps)) {
+ // Negotiation cipher options
+ CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
+ cipherOptions = Lists.newArrayListWithCapacity(1);
+ cipherOptions.add(option);
+ }
+ sendSaslMessageAndNegotiationCipherOptions(out, localResponse,
+ cipherOptions);
+
+ // step 2 (client-side only)
+ SaslResponseWithNegotiatedCipherOption response =
+ readSaslMessageAndNegotiatedCipherOption(in);
+ localResponse = sasl.evaluateChallengeOrResponse(response.payload);
assert localResponse == null;
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
- return sasl.createStreamPair(out, in);
+ CipherOption cipherOption = null;
+ if (sasl.isNegotiatedQopPrivacy()) {
+ // Unwrap the negotiated cipher option
+ cipherOption = unwrap(response.cipherOption, sasl);
+ }
+
+ // If negotiated cipher option is not null, we will use it to create
+ // stream pair.
+ return cipherOption != null ? createStreamPair(
+ conf, cipherOption, underlyingOut, underlyingIn, false) :
+ sasl.createStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
throw ioe;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index 2b82c82..005856d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -26,6 +26,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.List;
import java.util.Map;
import javax.security.auth.callback.Callback;
@@ -39,6 +40,7 @@ import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -53,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
/**
* Negotiates SASL for DataTransferProtocol on behalf of a server. There are
@@ -351,17 +354,40 @@ public class SaslDataTransferServer {
}
try {
// step 1
- performSaslStep1(out, in, sasl);
-
- // step 2 (server-side only)
byte[] remoteResponse = readSaslMessage(in);
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
sendSaslMessage(out, localResponse);
+ // step 2 (server-side only)
+ List<CipherOption> cipherOptions = Lists.newArrayList();
+ remoteResponse = readSaslMessageAndNegotiationCipherOptions(
+ in, cipherOptions);
+ localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+
// SASL handshake is complete
checkSaslComplete(sasl, saslProps);
- return sasl.createStreamPair(out, in);
+ CipherOption cipherOption = null;
+ if (sasl.isNegotiatedQopPrivacy()) {
+ // Negotiate a cipher option
+ cipherOption = negotiateCipherOption(dnConf.getConf(), cipherOptions);
+ if (cipherOption != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server using cipher suite " +
+ cipherOption.getCipherSuite().getName());
+ }
+ }
+ }
+
+ // If negotiated cipher option is not null, wrap it before sending.
+ sendSaslMessageAndNegotiatedCipherOption(out, localResponse,
+ wrap(cipherOption, sasl));
+
+ // If negotiated cipher option is not null, we will use it to create
+ // stream pair.
+ return cipherOption != null ? createStreamPair(
+ dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) :
+ sasl.createStreamPair(out, in);
} catch (IOException ioe) {
if (ioe instanceof SaslException &&
ioe.getCause() != null &&
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
index 106e297..f14a075 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
@@ -129,6 +129,50 @@ class SaslParticipant {
return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
}
}
+
+ /**
+ * After successful SASL negotiation, returns whether it's QOP privacy
+ *
+ * @return boolean whether it's QOP privacy
+ */
+ public boolean isNegotiatedQopPrivacy() {
+ String qop = getNegotiatedQop();
+ return qop != null && "auth-conf".equalsIgnoreCase(qop);
+ }
+
+ /**
+ * Wraps a byte array.
+ *
+ * @param bytes The array containing the bytes to wrap.
+ * @param off The starting position at the array
+ * @param len The number of bytes to wrap
+ * @return byte[] wrapped bytes
+ * @throws SaslException if the bytes cannot be successfully wrapped
+ */
+ public byte[] wrap(byte[] bytes, int off, int len) throws SaslException {
+ if (saslClient != null) {
+ return saslClient.wrap(bytes, off, len);
+ } else {
+ return saslServer.wrap(bytes, off, len);
+ }
+ }
+
+ /**
+ * Unwraps a byte array.
+ *
+ * @param bytes The array containing the bytes to unwrap.
+ * @param off The starting position at the array
+ * @param len The number of bytes to unwrap
+ * @return byte[] unwrapped bytes
+ * @throws SaslException if the bytes cannot be successfully unwrapped
+ */
+ public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException {
+ if (saslClient != null) {
+ return saslClient.unwrap(bytes, off, len);
+ } else {
+ return saslServer.unwrap(bytes, off, len);
+ }
+ }
/**
* Returns true if SASL negotiation is complete.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
new file mode 100644
index 0000000..f69441b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CipherOption;
+
+@InterfaceAudience.Private
+public class SaslResponseWithNegotiatedCipherOption {
+ final byte[] payload;
+ final CipherOption cipherOption;
+
+ public SaslResponseWithNegotiatedCipherOption(byte[] payload,
+ CipherOption cipherOption) {
+ this.payload = payload;
+ this.cipherOption = cipherOption;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index e48e85f..53ea6cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
+import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@@ -128,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
@@ -2689,6 +2691,83 @@ public class PBHelper {
return GetEditsFromTxidResponseProto.newBuilder().setEventsList(
builder.build()).build();
}
+
+ public static CipherOptionProto convert(CipherOption option) {
+ if (option != null) {
+ CipherOptionProto.Builder builder = CipherOptionProto.
+ newBuilder();
+ if (option.getCipherSuite() != null) {
+ builder.setSuite(convert(option.getCipherSuite()));
+ }
+ if (option.getInKey() != null) {
+ builder.setInKey(ByteString.copyFrom(option.getInKey()));
+ }
+ if (option.getInIv() != null) {
+ builder.setInIv(ByteString.copyFrom(option.getInIv()));
+ }
+ if (option.getOutKey() != null) {
+ builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
+ }
+ if (option.getOutIv() != null) {
+ builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
+ }
+ return builder.build();
+ }
+ return null;
+ }
+
+ public static CipherOption convert(CipherOptionProto proto) {
+ if (proto != null) {
+ CipherSuite suite = null;
+ if (proto.getSuite() != null) {
+ suite = convert(proto.getSuite());
+ }
+ byte[] inKey = null;
+ if (proto.getInKey() != null) {
+ inKey = proto.getInKey().toByteArray();
+ }
+ byte[] inIv = null;
+ if (proto.getInIv() != null) {
+ inIv = proto.getInIv().toByteArray();
+ }
+ byte[] outKey = null;
+ if (proto.getOutKey() != null) {
+ outKey = proto.getOutKey().toByteArray();
+ }
+ byte[] outIv = null;
+ if (proto.getOutIv() != null) {
+ outIv = proto.getOutIv().toByteArray();
+ }
+ return new CipherOption(suite, inKey, inIv, outKey, outIv);
+ }
+ return null;
+ }
+
+ public static List<CipherOptionProto> convertCipherOptions(
+ List<CipherOption> options) {
+ if (options != null) {
+ List<CipherOptionProto> protos =
+ Lists.newArrayListWithCapacity(options.size());
+ for (CipherOption option : options) {
+ protos.add(convert(option));
+ }
+ return protos;
+ }
+ return null;
+ }
+
+ public static List<CipherOption> convertCipherOptionProtos(
+ List<CipherOptionProto> protos) {
+ if (protos != null) {
+ List<CipherOption> options =
+ Lists.newArrayListWithCapacity(protos.size());
+ for (CipherOptionProto proto : protos) {
+ options.add(convert(proto));
+ }
+ return options;
+ }
+ return null;
+ }
public static CipherSuiteProto convert(CipherSuite suite) {
switch (suite) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index cea1ab7..8b881e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -785,7 +785,7 @@ public class Dispatcher {
: Executors.newFixedThreadPool(dispatcherThreads);
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
- this.saslClient = new SaslDataTransferClient(
+ this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 3127682..67cd1ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.security.SaslPropertiesResolver;
*/
@InterfaceAudience.Private
public class DNConf {
+ final Configuration conf;
final int socketTimeout;
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
@@ -100,6 +101,7 @@ public class DNConf {
final long maxLockedMemory;
public DNConf(Configuration conf) {
+ this.conf = conf;
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
@@ -197,6 +199,15 @@ public class DNConf {
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
}
+
+ /**
+ * Returns the configuration.
+ *
+ * @return Configuration the configuration
+ */
+ public Configuration getConf() {
+ return conf;
+ }
/**
* Returns true if encryption enabled for DataTransferProtocol.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index e4b5425..32c383f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1070,8 +1070,8 @@ public class DataNode extends ReconfigurableBase
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
- saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
- dnConf.trustedChannelResolver);
+ saslClient = new SaslDataTransferClient(dnConf.conf,
+ dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 50cc00d..fd1ba8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -42,6 +42,7 @@ message DataTransferEncryptorMessageProto {
required DataTransferEncryptorStatus status = 1;
optional bytes payload = 2;
optional string message = 3;
+ repeated CipherOptionProto cipherOption = 4;
}
message BaseHeaderProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 10af3b8..04a8f3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -265,6 +265,17 @@ message ZoneEncryptionInfoProto {
}
/**
+ * Cipher option
+ */
+message CipherOptionProto {
+ required CipherSuiteProto suite = 1;
+ optional bytes inKey = 2;
+ optional bytes inIv = 3;
+ optional bytes outKey = 4;
+ optional bytes outIv = 5;
+}
+
+/**
* A set of file blocks and their locations.
*/
message LocatedBlocksProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index a4282b5..38d3c50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1514,6 +1514,18 @@
the configured JCE default on the system is used (usually 3DES.) It is
widely believed that 3DES is more cryptographically secure, but RC4 is
substantially faster.
+
+ Note that if AES is supported by both the client and server then this
+ encryption algorithm will only be used to initially transfer keys for AES.
+ </description>
+</property>
+
+<property>
+ <name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
+ <value>128</value>
+ <description>
+ The key bitlength negotiated by dfsclient and datanode for encryption.
+ This value may be set to either 128, 192 or 256.
</description>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
index 131912d..7f6ad1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
@@ -37,11 +37,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -50,6 +54,10 @@ import org.mockito.Mockito;
@RunWith(Parameterized.class)
public class TestEncryptedTransfer {
+ {
+ LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG);
+ LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG);
+ }
@Parameters
public static Collection<Object[]> data() {
@@ -111,9 +119,28 @@ public class TestEncryptedTransfer {
.build();
fs = getFileSystem(conf);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+ LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(SaslDataTransferServer.class));
+ LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(DataTransferSaslUtil.class));
+ try {
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+ assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+ } finally {
+ logs.stopCapturing();
+ logs1.stopCapturing();
+ }
+
fs.close();
+
+ if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
+ // Test client and server negotiate cipher option
+ GenericTestUtils.assertMatches(logs.getOutput(),
+ "Server using cipher suite");
+ // Check the IOStreamPair
+ GenericTestUtils.assertMatches(logs1.getOutput(),
+ "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
+ }
} finally {
if (cluster != null) {
cluster.shutdown();
@@ -403,9 +430,28 @@ public class TestEncryptedTransfer {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
+
+ LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(SaslDataTransferServer.class));
+ LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(DataTransferSaslUtil.class));
+ try {
+ writeTestDataToFile(fs);
+ } finally {
+ logs.stopCapturing();
+ logs1.stopCapturing();
+ }
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
+
+ if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
+ // Test client and server negotiate cipher option
+ GenericTestUtils.assertMatches(logs.getOutput(),
+ "Server using cipher suite");
+ // Check the IOStreamPair
+ GenericTestUtils.assertMatches(logs1.getOutput(),
+ "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
+ }
} finally {
if (cluster != null) {
cluster.shutdown();