You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yl...@apache.org on 2014/10/28 14:19:15 UTC

git commit: HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)

Repository: hadoop
Updated Branches:
  refs/heads/trunk c9bec46c9 -> 58c0bb9ed


HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58c0bb9e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58c0bb9e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58c0bb9e

Branch: refs/heads/trunk
Commit: 58c0bb9ed9f4a2491395b63c68046562a73526c9
Parents: c9bec46
Author: yliu <yl...@apache.org>
Authored: Tue Oct 28 21:11:31 2014 +0800
Committer: yliu <yl...@apache.org>
Committed: Tue Oct 28 21:11:31 2014 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/crypto/CipherOption.java  |  66 +++++
 .../apache/hadoop/crypto/CryptoInputStream.java |  45 +++-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   2 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +
 .../datatransfer/sasl/DataTransferSaslUtil.java | 266 ++++++++++++++++++-
 .../sasl/SaslDataTransferClient.java            |  47 +++-
 .../sasl/SaslDataTransferServer.java            |  34 ++-
 .../datatransfer/sasl/SaslParticipant.java      |  44 +++
 .../SaslResponseWithNegotiatedCipherOption.java |  33 +++
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  79 ++++++
 .../hadoop/hdfs/server/balancer/Dispatcher.java |   2 +-
 .../hadoop/hdfs/server/datanode/DNConf.java     |  11 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |   4 +-
 .../src/main/proto/datatransfer.proto           |   1 +
 .../hadoop-hdfs/src/main/proto/hdfs.proto       |  11 +
 .../src/main/resources/hdfs-default.xml         |  12 +
 .../hadoop/hdfs/TestEncryptedTransfer.java      |  52 +++-
 18 files changed, 671 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java
new file mode 100644
index 0000000..6a8d8d0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherOption.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used between client and server to negotiate the 
+ * cipher suite, key and iv.
+ */
+@InterfaceAudience.Private
+public class CipherOption {
+  private final CipherSuite suite;
+  private final byte[] inKey;
+  private final byte[] inIv;
+  private final byte[] outKey;
+  private final byte[] outIv;
+  
+  public CipherOption(CipherSuite suite) {
+    this(suite, null, null, null, null);
+  }
+  
+  public CipherOption(CipherSuite suite, byte[] inKey, byte[] inIv, 
+      byte[] outKey, byte[] outIv) {
+    this.suite = suite;
+    this.inKey = inKey;
+    this.inIv = inIv;
+    this.outKey = outKey;
+    this.outIv = outIv;
+  }
+  
+  public CipherSuite getCipherSuite() {
+    return suite;
+  }
+  
+  public byte[] getInKey() {
+    return inKey;
+  }
+  
+  public byte[] getInIv() {
+    return inIv;
+  }
+  
+  public byte[] getOutKey() {
+    return outKey;
+  }
+  
+  public byte[] getOutIv() {
+    return outIv;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index 68e9697..4b53563 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -23,6 +23,7 @@ import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.security.GeneralSecurityException;
 import java.util.EnumSet;
 import java.util.Queue;
@@ -57,7 +58,8 @@ import com.google.common.base.Preconditions;
 @InterfaceStability.Evolving
 public class CryptoInputStream extends FilterInputStream implements 
     Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
-    CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+    CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, 
+    ReadableByteChannel {
   private static final byte[] oneByteBuf = new byte[1];
   private final CryptoCodec codec;
   private final Decryptor decryptor;
@@ -92,6 +94,8 @@ public class CryptoInputStream extends FilterInputStream implements
   private final byte[] key;
   private final byte[] initIV;
   private byte[] iv;
+  private final boolean isByteBufferReadable;
+  private final boolean isReadableByteChannel;
   
   /** DirectBuffer pool */
   private final Queue<ByteBuffer> bufferPool = 
@@ -115,6 +119,8 @@ public class CryptoInputStream extends FilterInputStream implements
     this.initIV = iv.clone();
     this.iv = iv.clone();
     this.streamOffset = streamOffset;
+    isByteBufferReadable = in instanceof ByteBufferReadable;
+    isReadableByteChannel = in instanceof ReadableByteChannel;
     inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
     outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
     decryptor = getDecryptor();
@@ -165,9 +171,11 @@ public class CryptoInputStream extends FilterInputStream implements
        * it can avoid bytes copy.
        */
       if (usingByteBufferRead == null) {
-        if (in instanceof ByteBufferReadable) {
+        if (isByteBufferReadable || isReadableByteChannel) {
           try {
-            n = ((ByteBufferReadable) in).read(inBuffer);
+            n = isByteBufferReadable ? 
+                ((ByteBufferReadable) in).read(inBuffer) : 
+                  ((ReadableByteChannel) in).read(inBuffer);
             usingByteBufferRead = Boolean.TRUE;
           } catch (UnsupportedOperationException e) {
             usingByteBufferRead = Boolean.FALSE;
@@ -180,7 +188,8 @@ public class CryptoInputStream extends FilterInputStream implements
         }
       } else {
         if (usingByteBufferRead) {
-          n = ((ByteBufferReadable) in).read(inBuffer);
+          n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) : 
+                ((ReadableByteChannel) in).read(inBuffer);
         } else {
           n = readFromUnderlyingStream(inBuffer);
         }
@@ -450,7 +459,7 @@ public class CryptoInputStream extends FilterInputStream implements
   @Override
   public int read(ByteBuffer buf) throws IOException {
     checkStream();
-    if (in instanceof ByteBufferReadable) {
+    if (isByteBufferReadable || isReadableByteChannel) {
       final int unread = outBuffer.remaining();
       if (unread > 0) { // Have unread decrypted data in buffer.
         int toRead = buf.remaining();
@@ -466,7 +475,8 @@ public class CryptoInputStream extends FilterInputStream implements
       }
       
       final int pos = buf.position();
-      final int n = ((ByteBufferReadable) in).read(buf);
+      final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) : 
+            ((ReadableByteChannel) in).read(buf);
       if (n > 0) {
         streamOffset += n; // Read n bytes
         decrypt(buf, n, pos);
@@ -481,10 +491,22 @@ public class CryptoInputStream extends FilterInputStream implements
           return unread;
         }
       }
+    } else {
+      int n = 0;
+      if (buf.hasArray()) {
+        n = read(buf.array(), buf.position(), buf.remaining());
+        if (n > 0) {
+          buf.position(buf.position() + n);
+        }
+      } else {
+        byte[] tmp = new byte[buf.remaining()];
+        n = read(tmp);
+        if (n > 0) {
+          buf.put(tmp, 0, n);
+        }
+      }
+      return n;
     }
-
-    throw new UnsupportedOperationException("ByteBuffer read unsupported " +
-        "by input stream.");
   }
   
   /**
@@ -686,4 +708,9 @@ public class CryptoInputStream extends FilterInputStream implements
       decryptorPool.add(decryptor);
     }
   }
+
+  @Override
+  public boolean isOpen() {
+    return !closed;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e18b935..13b2e9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -663,6 +663,8 @@ Release 2.6.0 - UNRELEASED
     HDFS-7122. Use of ThreadLocal<Random> results in poor block placement.
     (wang)
 
+    HDFS-6606. Optimize HDFS Encrypted Transport performance. (yliu)
+
   BUG FIXES
 
     HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 97ffdde..5ba6273 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -686,7 +686,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       this.initThreadsNumForHedgedReads(numThreads);
     }
     this.saslClient = new SaslDataTransferClient(
-      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
       TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5d7d252..59eaa20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -600,6 +600,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // Security-related configs
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
+  public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength";
+  public static final int    DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
   public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
   public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index 81d740f..2d5e13c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROT
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.IOException;
@@ -28,6 +30,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.security.sasl.Sasl;
@@ -35,10 +38,18 @@ import javax.security.sasl.Sasl;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
 import org.slf4j.Logger;
@@ -95,6 +106,19 @@ public final class DataTransferSaslUtil {
         "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
     }
   }
+  
+  /**
+   * Check whether requested SASL Qop contains privacy.
+   * 
+   * @param saslProps properties of SASL negotiation
+   * @return boolean true if privacy exists
+   */
+  public static boolean requestedQopContainsPrivacy(
+      Map<String, String> saslProps) {
+    Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
+        saslProps.get(Sasl.QOP).split(",")));
+    return requestedQop.contains("auth-conf");
+  }
 
   /**
    * Creates SASL properties required for an encrypted SASL negotiation.
@@ -177,20 +201,6 @@ public final class DataTransferSaslUtil {
   }
 
   /**
-   * Performs the first step of SASL negotiation.
-   *
-   * @param out connection output stream
-   * @param in connection input stream
-   * @param sasl participant
-   */
-  public static void performSaslStep1(OutputStream out, InputStream in,
-      SaslParticipant sasl) throws IOException {
-    byte[] remoteResponse = readSaslMessage(in);
-    byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
-    sendSaslMessage(out, localResponse);
-  }
-
-  /**
    * Reads a SASL negotiation message.
    *
    * @param in stream to read
@@ -208,6 +218,124 @@ public final class DataTransferSaslUtil {
       return proto.getPayload().toByteArray();
     }
   }
+  
+  /**
+   * Reads a SASL negotiation message and negotiation cipher options. 
+   * 
+   * @param in stream to read
+   * @param cipherOptions list to store negotiation cipher options
+   * @return byte[] SASL negotiation message
+   * @throws IOException for any error
+   */
+  public static byte[] readSaslMessageAndNegotiationCipherOptions(
+      InputStream in, List<CipherOption> cipherOptions) throws IOException {
+    DataTransferEncryptorMessageProto proto =
+        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+      throw new InvalidEncryptionKeyException(proto.getMessage());
+    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+      throw new IOException(proto.getMessage());
+    } else {
+      List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
+      if (optionProtos != null) {
+        for (CipherOptionProto optionProto : optionProtos) {
+          cipherOptions.add(PBHelper.convert(optionProto));
+        }
+      }
+      return proto.getPayload().toByteArray();
+    }
+  }
+  
+  /**
+   * Negotiate a cipher option which server supports.
+   * 
+   * @param options the cipher options which client supports
+   * @return CipherOption negotiated cipher option
+   */
+  public static CipherOption negotiateCipherOption(Configuration conf,
+      List<CipherOption> options) {
+    if (options != null) {
+      for (CipherOption option : options) {
+        // Currently we support AES/CTR/NoPadding
+        CipherSuite suite = option.getCipherSuite();
+        if (suite == CipherSuite.AES_CTR_NOPADDING) {
+          int keyLen = conf.getInt(
+              DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY,
+              DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8;
+          CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
+          byte[] inKey = new byte[keyLen];
+          byte[] inIv = new byte[suite.getAlgorithmBlockSize()];
+          byte[] outKey = new byte[keyLen];
+          byte[] outIv = new byte[suite.getAlgorithmBlockSize()];
+          codec.generateSecureRandom(inKey);
+          codec.generateSecureRandom(inIv);
+          codec.generateSecureRandom(outKey);
+          codec.generateSecureRandom(outIv);
+          return new CipherOption(suite, inKey, inIv, outKey, outIv);
+        }
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Send SASL message and negotiated cipher option to client.
+   * 
+   * @param out stream to receive message
+   * @param payload to send
+   * @param option negotiated cipher option
+   * @throws IOException for any error
+   */
+  public static void sendSaslMessageAndNegotiatedCipherOption(
+      OutputStream out, byte[] payload, CipherOption option) 
+          throws IOException {
+    DataTransferEncryptorMessageProto.Builder builder =
+        DataTransferEncryptorMessageProto.newBuilder();
+    
+    builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+    if (payload != null) {
+      builder.setPayload(ByteString.copyFrom(payload));
+    }
+    if (option != null) {
+      builder.addCipherOption(PBHelper.convert(option));
+    }
+    
+    DataTransferEncryptorMessageProto proto = builder.build();
+    proto.writeDelimitedTo(out);
+    out.flush();
+  }
+  
+  /**
+   * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream}
+   * and {@link org.apache.hadoop.crypto.CryptoOutputStream}
+   * 
+   * @param conf the configuration
+   * @param cipherOption negotiated cipher option
+   * @param out underlying output stream
+   * @param in underlying input stream
+   * @param isServer is server side
+   * @return IOStreamPair the stream pair
+   * @throws IOException for any error
+   */
+  public static IOStreamPair createStreamPair(Configuration conf,
+      CipherOption cipherOption, OutputStream out, InputStream in, 
+      boolean isServer) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
+          "CryptoOutputStream.");
+    }
+    CryptoCodec codec = CryptoCodec.getInstance(conf, 
+        cipherOption.getCipherSuite());
+    byte[] inKey = cipherOption.getInKey();
+    byte[] inIv = cipherOption.getInIv();
+    byte[] outKey = cipherOption.getOutKey();
+    byte[] outIv = cipherOption.getOutIv();
+    InputStream cIn = new CryptoInputStream(in, codec, 
+        isServer ? inKey : outKey, isServer ? inIv : outIv);
+    OutputStream cOut = new CryptoOutputStream(out, codec, 
+        isServer ? outKey : inKey, isServer ? outIv : inIv);
+    return new IOStreamPair(cIn, cOut);
+  }
 
   /**
    * Sends a SASL negotiation message indicating an error.
@@ -232,6 +360,116 @@ public final class DataTransferSaslUtil {
       throws IOException {
     sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
   }
+  
+  /**
+   * Send a SASL negotiation message and negotiation cipher options to server.
+   * 
+   * @param out stream to receive message
+   * @param payload to send
+   * @param options cipher options to negotiate
+   * @throws IOException for any error
+   */
+  public static void sendSaslMessageAndNegotiationCipherOptions(
+      OutputStream out, byte[] payload, List<CipherOption> options)
+          throws IOException {
+    DataTransferEncryptorMessageProto.Builder builder =
+        DataTransferEncryptorMessageProto.newBuilder();
+    
+    builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+    if (payload != null) {
+      builder.setPayload(ByteString.copyFrom(payload));
+    }
+    if (options != null) {
+      builder.addAllCipherOption(PBHelper.convertCipherOptions(options));
+    }
+    
+    DataTransferEncryptorMessageProto proto = builder.build();
+    proto.writeDelimitedTo(out);
+    out.flush();
+  }
+  
+  /**
+   * Read SASL message and negotiated cipher option from server.
+   * 
+   * @param in stream to read
+   * @return SaslResponseWithNegotiatedCipherOption SASL message and 
+   * negotiated cipher option
+   * @throws IOException for any error
+   */
+  public static SaslResponseWithNegotiatedCipherOption
+      readSaslMessageAndNegotiatedCipherOption(InputStream in)
+          throws IOException {
+    DataTransferEncryptorMessageProto proto =
+        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+      throw new InvalidEncryptionKeyException(proto.getMessage());
+    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+      throw new IOException(proto.getMessage());
+    } else {
+      byte[] response = proto.getPayload().toByteArray();
+      List<CipherOption> options = PBHelper.convertCipherOptionProtos(
+          proto.getCipherOptionList());
+      CipherOption option = null;
+      if (options != null && !options.isEmpty()) {
+        option = options.get(0);
+      }
+      return new SaslResponseWithNegotiatedCipherOption(response, option);
+    }
+  }
+  
+  /**
+   * Encrypt the key and iv of the negotiated cipher option.
+   * 
+   * @param option negotiated cipher option
+   * @param sasl SASL participant representing server
+   * @return CipherOption negotiated cipher option which contains the 
+   * encrypted key and iv
+   * @throws IOException for any error
+   */
+  public static CipherOption wrap(CipherOption option, SaslParticipant sasl) 
+      throws IOException {
+    if (option != null) {
+      byte[] inKey = option.getInKey();
+      if (inKey != null) {
+        inKey = sasl.wrap(inKey, 0, inKey.length);
+      }
+      byte[] outKey = option.getOutKey();
+      if (outKey != null) {
+        outKey = sasl.wrap(outKey, 0, outKey.length);
+      }
+      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
+          outKey, option.getOutIv());
+    }
+    
+    return null;
+  }
+  
+  /**
+   * Decrypt the key and iv of the negotiated cipher option.
+   * 
+   * @param option negotiated cipher option
+   * @param sasl SASL participant representing client
+   * @return CipherOption negotiated cipher option which contains the 
+   * decrypted key and iv
+   * @throws IOException for any error
+   */
+  public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
+      throws IOException {
+    if (option != null) {
+      byte[] inKey = option.getInKey();
+      if (inKey != null) {
+        inKey = sasl.unwrap(inKey, 0, inKey.length);
+      }
+      byte[] outKey = option.getOutKey();
+      if (outKey != null) {
+        outKey = sasl.unwrap(outKey, 0, outKey.length);
+      }
+      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
+          outKey, option.getOutIv());
+    }
+    
+    return null;
+  }
 
   /**
    * Sends a SASL negotiation message.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index 9df9929..cfcc91f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
 
 import java.io.DataInputStream;
@@ -27,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -40,6 +40,9 @@ import javax.security.sasl.RealmChoiceCallback;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.hdfs.net.EncryptedPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -54,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 
 /**
  * Negotiates SASL for DataTransferProtocol on behalf of a client.  There are
@@ -72,6 +76,7 @@ public class SaslDataTransferClient {
   private static final Logger LOG = LoggerFactory.getLogger(
     SaslDataTransferClient.class);
 
+  private final Configuration conf;
   private final AtomicBoolean fallbackToSimpleAuth;
   private final SaslPropertiesResolver saslPropsResolver;
   private final TrustedChannelResolver trustedChannelResolver;
@@ -82,27 +87,32 @@ public class SaslDataTransferClient {
    * simple auth.  For intra-cluster connections between data nodes in the same
    * cluster, we can assume that all run under the same security configuration.
    *
+   * @param conf the configuration
    * @param saslPropsResolver for determining properties of SASL negotiation
    * @param trustedChannelResolver for identifying trusted connections that do
    *   not require SASL negotiation
    */
-  public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+  public SaslDataTransferClient(Configuration conf, 
+      SaslPropertiesResolver saslPropsResolver,
       TrustedChannelResolver trustedChannelResolver) {
-    this(saslPropsResolver, trustedChannelResolver, null);
+    this(conf, saslPropsResolver, trustedChannelResolver, null);
   }
 
   /**
    * Creates a new SaslDataTransferClient.
    *
+   * @param conf the configuration
    * @param saslPropsResolver for determining properties of SASL negotiation
    * @param trustedChannelResolver for identifying trusted connections that do
    *   not require SASL negotiation
    * @param fallbackToSimpleAuth checked on each attempt at general SASL
    *   handshake, if true forces use of simple auth
    */
-  public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+  public SaslDataTransferClient(Configuration conf, 
+      SaslPropertiesResolver saslPropsResolver,
       TrustedChannelResolver trustedChannelResolver,
       AtomicBoolean fallbackToSimpleAuth) {
+    this.conf = conf;
     this.fallbackToSimpleAuth = fallbackToSimpleAuth;
     this.saslPropsResolver = saslPropsResolver;
     this.trustedChannelResolver = trustedChannelResolver;
@@ -436,17 +446,38 @@ public class SaslDataTransferClient {
       sendSaslMessage(out, new byte[0]);
 
       // step 1
-      performSaslStep1(out, in, sasl);
-
-      // step 2 (client-side only)
       byte[] remoteResponse = readSaslMessage(in);
       byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+      List<CipherOption> cipherOptions = null;
+      if (requestedQopContainsPrivacy(saslProps)) {
+        // Negotiation cipher options
+        CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
+        cipherOptions = Lists.newArrayListWithCapacity(1);
+        cipherOptions.add(option);
+      }
+      sendSaslMessageAndNegotiationCipherOptions(out, localResponse, 
+          cipherOptions);
+
+      // step 2 (client-side only)
+      SaslResponseWithNegotiatedCipherOption response = 
+          readSaslMessageAndNegotiatedCipherOption(in);
+      localResponse = sasl.evaluateChallengeOrResponse(response.payload);
       assert localResponse == null;
 
       // SASL handshake is complete
       checkSaslComplete(sasl, saslProps);
 
-      return sasl.createStreamPair(out, in);
+      CipherOption cipherOption = null;
+      if (sasl.isNegotiatedQopPrivacy()) {
+        // Unwrap the negotiated cipher option
+        cipherOption = unwrap(response.cipherOption, sasl);
+      }
+
+      // If negotiated cipher option is not null, we will use it to create 
+      // stream pair.
+      return cipherOption != null ? createStreamPair(
+          conf, cipherOption, underlyingOut, underlyingIn, false) : 
+            sasl.createStreamPair(out, in);
     } catch (IOException ioe) {
       sendGenericSaslErrorMessage(out, ioe.getMessage());
       throw ioe;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index 2b82c82..005856d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -26,6 +26,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 import java.util.Map;
 
 import javax.security.auth.callback.Callback;
@@ -39,6 +40,7 @@ import javax.security.sasl.SaslException;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CipherOption;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -53,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 
 /**
  * Negotiates SASL for DataTransferProtocol on behalf of a server.  There are
@@ -351,17 +354,40 @@ public class SaslDataTransferServer {
     }
     try {
       // step 1
-      performSaslStep1(out, in, sasl);
-
-      // step 2 (server-side only)
       byte[] remoteResponse = readSaslMessage(in);
       byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
       sendSaslMessage(out, localResponse);
 
+      // step 2 (server-side only)
+      List<CipherOption> cipherOptions = Lists.newArrayList();
+      remoteResponse = readSaslMessageAndNegotiationCipherOptions(
+          in, cipherOptions);
+      localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+
       // SASL handshake is complete
       checkSaslComplete(sasl, saslProps);
 
-      return sasl.createStreamPair(out, in);
+      CipherOption cipherOption = null;
+      if (sasl.isNegotiatedQopPrivacy()) {
+        // Negotiate a cipher option
+        cipherOption = negotiateCipherOption(dnConf.getConf(), cipherOptions);
+        if (cipherOption != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Server using cipher suite " + 
+                cipherOption.getCipherSuite().getName());
+          }
+        }
+      }
+
+      // If negotiated cipher option is not null, wrap it before sending.
+      sendSaslMessageAndNegotiatedCipherOption(out, localResponse, 
+          wrap(cipherOption, sasl));
+
+      // If negotiated cipher option is not null, we will use it to create 
+      // stream pair.
+      return cipherOption != null ? createStreamPair(
+          dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) : 
+            sasl.createStreamPair(out, in);
     } catch (IOException ioe) {
       if (ioe instanceof SaslException &&
           ioe.getCause() != null &&

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
index 106e297..f14a075 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
@@ -129,6 +129,50 @@ class SaslParticipant {
       return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
     }
   }
+  
+  /**
+   * After successful SASL negotiation, returns whether it's QOP privacy
+   * 
+   * @return boolean whether it's QOP privacy
+   */
+  public boolean isNegotiatedQopPrivacy() {
+    String qop = getNegotiatedQop();
+    return qop != null && "auth-conf".equalsIgnoreCase(qop);
+  }
+  
+  /**
+   * Wraps a byte array.
+   * 
+   * @param bytes The array containing the bytes to wrap.
+   * @param off The starting position at the array
+   * @param len The number of bytes to wrap
+   * @return byte[] wrapped bytes
+   * @throws SaslException if the bytes cannot be successfully wrapped
+   */
+  public byte[] wrap(byte[] bytes, int off, int len) throws SaslException {
+    if (saslClient != null) {
+      return saslClient.wrap(bytes, off, len);
+    } else {
+      return saslServer.wrap(bytes, off, len);
+    }
+  }
+  
+  /**
+   * Unwraps a byte array.
+   * 
+   * @param bytes The array containing the bytes to unwrap.
+   * @param off The starting position at the array
+   * @param len The number of bytes to unwrap
+   * @return byte[] unwrapped bytes
+   * @throws SaslException if the bytes cannot be successfully unwrapped
+   */
+  public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException {
+    if (saslClient != null) {
+      return saslClient.unwrap(bytes, off, len);
+    } else {
+      return saslServer.unwrap(bytes, off, len);
+    }
+  }
 
   /**
    * Returns true if SASL negotiation is complete.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
new file mode 100644
index 0000000..f69441b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CipherOption;
+
+@InterfaceAudience.Private
+public class SaslResponseWithNegotiatedCipherOption {
+  final byte[] payload;
+  final CipherOption cipherOption;
+  
+  public SaslResponseWithNegotiatedCipherOption(byte[] payload, 
+      CipherOption cipherOption) {
+    this.payload = payload;
+    this.cipherOption = cipherOption;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index e48e85f..53ea6cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolStats;
+import org.apache.hadoop.crypto.CipherOption;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@@ -128,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
@@ -2689,6 +2691,83 @@ public class PBHelper {
     return GetEditsFromTxidResponseProto.newBuilder().setEventsList(
         builder.build()).build();
   }
+  
+  public static CipherOptionProto convert(CipherOption option) {
+    if (option != null) {
+      CipherOptionProto.Builder builder = CipherOptionProto.
+          newBuilder();
+      if (option.getCipherSuite() != null) {
+        builder.setSuite(convert(option.getCipherSuite()));
+      }
+      if (option.getInKey() != null) {
+        builder.setInKey(ByteString.copyFrom(option.getInKey()));
+      }
+      if (option.getInIv() != null) {
+        builder.setInIv(ByteString.copyFrom(option.getInIv()));
+      }
+      if (option.getOutKey() != null) {
+        builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
+      }
+      if (option.getOutIv() != null) {
+        builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
+      }
+      return builder.build();
+    }
+    return null;
+  }
+  
+  public static CipherOption convert(CipherOptionProto proto) {
+    if (proto != null) {
+      CipherSuite suite = null;
+      if (proto.getSuite() != null) {
+        suite = convert(proto.getSuite());
+      }
+      byte[] inKey = null;
+      if (proto.getInKey() != null) {
+        inKey = proto.getInKey().toByteArray();
+      }
+      byte[] inIv = null;
+      if (proto.getInIv() != null) {
+        inIv = proto.getInIv().toByteArray();
+      }
+      byte[] outKey = null;
+      if (proto.getOutKey() != null) {
+        outKey = proto.getOutKey().toByteArray();
+      }
+      byte[] outIv = null;
+      if (proto.getOutIv() != null) {
+        outIv = proto.getOutIv().toByteArray();
+      }
+      return new CipherOption(suite, inKey, inIv, outKey, outIv);
+    }
+    return null;
+  }
+  
+  public static List<CipherOptionProto> convertCipherOptions(
+      List<CipherOption> options) {
+    if (options != null) {
+      List<CipherOptionProto> protos = 
+          Lists.newArrayListWithCapacity(options.size());
+      for (CipherOption option : options) {
+        protos.add(convert(option));
+      }
+      return protos;
+    }
+    return null;
+  }
+  
+  public static List<CipherOption> convertCipherOptionProtos(
+      List<CipherOptionProto> protos) {
+    if (protos != null) {
+      List<CipherOption> options = 
+          Lists.newArrayListWithCapacity(protos.size());
+      for (CipherOptionProto proto : protos) {
+        options.add(convert(proto));
+      }
+      return options;
+    }
+    return null;
+  }
 
   public static CipherSuiteProto convert(CipherSuite suite) {
     switch (suite) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index cea1ab7..8b881e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -785,7 +785,7 @@ public class Dispatcher {
         : Executors.newFixedThreadPool(dispatcherThreads);
     this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
-    this.saslClient = new SaslDataTransferClient(
+    this.saslClient = new SaslDataTransferClient(conf,
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),
         TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 3127682..67cd1ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.security.SaslPropertiesResolver;
  */
 @InterfaceAudience.Private
 public class DNConf {
+  final Configuration conf;
   final int socketTimeout;
   final int socketWriteTimeout;
   final int socketKeepaliveTimeout;
@@ -100,6 +101,7 @@ public class DNConf {
   final long maxLockedMemory;
 
   public DNConf(Configuration conf) {
+    this.conf = conf;
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
         HdfsServerConstants.READ_TIMEOUT);
     socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
@@ -197,6 +199,15 @@ public class DNConf {
   String getMinimumNameNodeVersion() {
     return this.minimumNameNodeVersion;
   }
+  
+  /**
+   * Returns the configuration.
+   * 
+   * @return Configuration the configuration
+   */
+  public Configuration getConf() {
+    return conf;
+  }
 
   /**
    * Returns true if encryption enabled for DataTransferProtocol.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index e4b5425..32c383f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1070,8 +1070,8 @@ public class DataNode extends ReconfigurableBase
     // Create the ReadaheadPool from the DataNode context so we can
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
-    saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
-      dnConf.trustedChannelResolver);
+    saslClient = new SaslDataTransferClient(dnConf.conf, 
+        dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 50cc00d..fd1ba8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -42,6 +42,7 @@ message DataTransferEncryptorMessageProto {
   required DataTransferEncryptorStatus status = 1;
   optional bytes payload = 2;
   optional string message = 3;
+  repeated CipherOptionProto cipherOption = 4;
 }
 
 message BaseHeaderProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 10af3b8..04a8f3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -265,6 +265,17 @@ message ZoneEncryptionInfoProto {
 }
 
 /**
+ * Cipher option
+ */
+message CipherOptionProto {
+  required CipherSuiteProto suite = 1;
+  optional bytes inKey = 2;
+  optional bytes inIv = 3;
+  optional bytes outKey = 4;
+  optional bytes outIv = 5;
+}
+
+/**
  * A set of file blocks and their locations.
  */
 message LocatedBlocksProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index a4282b5..38d3c50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1514,6 +1514,18 @@
     the configured JCE default on the system is used (usually 3DES.) It is
     widely believed that 3DES is more cryptographically secure, but RC4 is
     substantially faster.
+    
+    Note that if AES is supported by both the client and server then this 
+    encryption algorithm will only be used to initially transfer keys for AES.
+  </description>
+</property>
+
+<property>
+  <name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
+  <value>128</value>
+  <description>
+    The key bitlength negotiated by dfsclient and datanode for encryption.
+    This value may be set to either 128, 192 or 256.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58c0bb9e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
index 131912d..7f6ad1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
@@ -37,11 +37,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -50,6 +54,10 @@ import org.mockito.Mockito;
 
 @RunWith(Parameterized.class)
 public class TestEncryptedTransfer {
+  {
+    LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG);
+    LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG);
+  }
   
   @Parameters
   public static Collection<Object[]> data() {
@@ -111,9 +119,28 @@ public class TestEncryptedTransfer {
           .build();
       
       fs = getFileSystem(conf);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+          LogFactory.getLog(SaslDataTransferServer.class));
+      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+          LogFactory.getLog(DataTransferSaslUtil.class));
+      try {
+        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+        assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+      } finally {
+        logs.stopCapturing();
+        logs1.stopCapturing();
+      }
+      
       fs.close();
+      
+      if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
+        // Test client and server negotiate cipher option
+        GenericTestUtils.assertMatches(logs.getOutput(), 
+            "Server using cipher suite");
+        // Check the IOStreamPair
+        GenericTestUtils.assertMatches(logs1.getOutput(), 
+            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
+      }
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -403,9 +430,28 @@ public class TestEncryptedTransfer {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
       
       FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
+      
+      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+          LogFactory.getLog(SaslDataTransferServer.class));
+      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+          LogFactory.getLog(DataTransferSaslUtil.class));
+      try {
+        writeTestDataToFile(fs);
+      } finally {
+        logs.stopCapturing();
+        logs1.stopCapturing();
+      }
       assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
       fs.close();
+      
+      if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
+        // Test client and server negotiate cipher option
+        GenericTestUtils.assertMatches(logs.getOutput(), 
+            "Server using cipher suite");
+        // Check the IOStreamPair
+        GenericTestUtils.assertMatches(logs1.getOutput(), 
+            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
+      }
     } finally {
       if (cluster != null) {
         cluster.shutdown();