You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 17:46:41 UTC
[39/50] [abbrv] hadoop git commit: HDFS-9002. Move
o.a.h.hdfs.net/*Peer classes to hdfs-client. Contributed by Mingliang Liu.
HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed78b14e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed78b14e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed78b14e
Branch: refs/heads/YARN-3926
Commit: ed78b14ebc9a21bb57ccd088e8b49bfa457a396f
Parents: c2d2c18
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Sep 3 15:32:53 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Sep 3 15:32:53 2015 -0700
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 1 +
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 64 +++
.../hdfs/client/HdfsClientConfigKeys.java | 14 +
.../apache/hadoop/hdfs/net/BasicInetPeer.java | 133 +++++
.../apache/hadoop/hdfs/net/EncryptedPeer.java | 142 +++++
.../org/apache/hadoop/hdfs/net/NioInetPeer.java | 136 +++++
.../java/org/apache/hadoop/hdfs/net/Peer.java | 8 +-
.../protocol/datatransfer/IOStreamPair.java | 37 ++
.../datatransfer/TrustedChannelResolver.java | 81 +++
.../sasl/DataEncryptionKeyFactory.java | 38 ++
.../datatransfer/sasl/DataTransferSaslUtil.java | 519 +++++++++++++++++++
.../sasl/SaslDataTransferClient.java | 498 ++++++++++++++++++
.../datatransfer/sasl/SaslParticipant.java | 210 ++++++++
.../SaslResponseWithNegotiatedCipherOption.java | 33 ++
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 102 ++++
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 3 +-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 28 +-
.../apache/hadoop/hdfs/net/BasicInetPeer.java | 133 -----
.../apache/hadoop/hdfs/net/EncryptedPeer.java | 142 -----
.../org/apache/hadoop/hdfs/net/NioInetPeer.java | 136 -----
.../apache/hadoop/hdfs/net/TcpPeerServer.java | 65 +--
.../protocol/datatransfer/IOStreamPair.java | 37 --
.../datatransfer/TrustedChannelResolver.java | 81 ---
.../sasl/DataEncryptionKeyFactory.java | 38 --
.../datatransfer/sasl/DataTransferSaslUtil.java | 519 -------------------
.../sasl/SaslDataTransferClient.java | 498 ------------------
.../sasl/SaslDataTransferServer.java | 2 +-
.../datatransfer/sasl/SaslParticipant.java | 210 --------
.../SaslResponseWithNegotiatedCipherOption.java | 33 --
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 111 +---
.../hdfs/server/namenode/FSDirXAttrOp.java | 8 +-
.../hdfs/server/namenode/FSDirectory.java | 3 +-
.../hdfs/server/namenode/NamenodeFsck.java | 4 +-
.../apache/hadoop/hdfs/BlockReaderTestUtil.java | 3 +-
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 2 +-
.../hadoop/hdfs/TestEncryptedTransfer.java | 4 +-
.../sasl/SaslDataTransferTestCase.java | 2 +-
.../datatransfer/sasl/TestSaslDataTransfer.java | 2 +-
.../hdfs/qjournal/TestSecureNNWithQJM.java | 2 +-
.../blockmanagement/TestBlockTokenWithDFS.java | 4 +-
.../datanode/TestDataNodeVolumeFailure.java | 4 +-
42 files changed, 2064 insertions(+), 2029 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 036ac09..515da24 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -14,6 +14,7 @@
<Class name="org.apache.hadoop.hdfs.protocol.SnapshotDiffReport$DiffReportEntry"/>
<Class name="org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus"/>
<Class name="org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport"/>
+ <Class name="org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslResponseWithNegotiatedCipherOption"/>
</Or>
<Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2" />
</Match>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index a89f556..b032250 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -27,16 +27,24 @@ import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.net.BasicInetPeer;
+import org.apache.hadoop.hdfs.net.NioInetPeer;
+import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +54,10 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
@@ -523,4 +533,58 @@ public class DFSUtilClient {
}
return keyProvider;
}
+
+ public static Peer peerFromSocket(Socket socket)
+ throws IOException {
+ Peer peer = null;
+ boolean success = false;
+ try {
+ // TCP_NODELAY is crucial here because of bad interactions between
+ // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+ // between the client and DN, the conversation looks like:
+ // 1. Client -> DN: Read block X
+ // 2. DN -> Client: data for block X
+ // 3. Client -> DN: Status OK (successful read)
+ // 4. Client -> DN: Read block Y
+ // The fact that step #3 and #4 are both in the client->DN direction
+ // triggers Nagling. If the DN is using delayed ACKs, this results
+ // in a delay of 40ms or more.
+ //
+ // TCP_NODELAY disables nagling and thus avoids this performance
+ // disaster.
+ socket.setTcpNoDelay(true);
+ SocketChannel channel = socket.getChannel();
+ if (channel == null) {
+ peer = new BasicInetPeer(socket);
+ } else {
+ peer = new NioInetPeer(socket);
+ }
+ success = true;
+ return peer;
+ } finally {
+ if (!success) {
+ if (peer != null) peer.close();
+ socket.close();
+ }
+ }
+ }
+
+ public static Peer peerFromSocketAndKey(
+ SaslDataTransferClient saslClient, Socket s,
+ DataEncryptionKeyFactory keyFactory,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException {
+ Peer peer = null;
+ boolean success = false;
+ try {
+ peer = peerFromSocket(s);
+ peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
+ success = true;
+ return peer;
+ } finally {
+ if (!success) {
+ IOUtilsClient.cleanup(null, peer);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 96bc8d3..e417fbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -126,6 +126,20 @@ public interface HdfsClientConfigKeys {
long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
+ String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
+ "dfs.encrypt.data.transfer.cipher.suites";
+
+ String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
+ String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
+ String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
+ "dfs.data.transfer.saslproperties.resolver.class";
+
+ String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY =
+ "dfs.encrypt.data.transfer.cipher.key.bitlength";
+ int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
+
+ String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
+
String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
PREFIX + "replica.accessor.builder.classes";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
new file mode 100644
index 0000000..212dbef
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+
+/**
+ * Represents a peer that we communicate with by using a basic Socket
+ * that has no associated Channel.
+ *
+ */
+public class BasicInetPeer implements Peer {
+ private final Socket socket;
+ private final OutputStream out;
+ private final InputStream in;
+ private final boolean isLocal;
+
+ public BasicInetPeer(Socket socket) throws IOException {
+ this.socket = socket;
+ this.out = socket.getOutputStream();
+ this.in = socket.getInputStream();
+ this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
+ }
+
+ @Override
+ public ReadableByteChannel getInputStreamChannel() {
+ /*
+ * This Socket has no channel, so there's nothing to return here.
+ */
+ return null;
+ }
+
+ @Override
+ public void setReadTimeout(int timeoutMs) throws IOException {
+ socket.setSoTimeout(timeoutMs);
+ }
+
+ @Override
+ public int getReceiveBufferSize() throws IOException {
+ return socket.getReceiveBufferSize();
+ }
+
+ @Override
+ public boolean getTcpNoDelay() throws IOException {
+ return socket.getTcpNoDelay();
+ }
+
+ @Override
+ public void setWriteTimeout(int timeoutMs) {
+ /*
+ * We can't implement write timeouts. :(
+ *
+ * Java provides no facility to set a blocking write timeout on a Socket.
+ * You can simulate a blocking write with a timeout by using
+ * non-blocking I/O. However, we can't use nio here, because this Socket
+ * doesn't have an associated Channel.
+ *
+ * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for
+ * more details.
+ */
+ }
+
+ @Override
+ public boolean isClosed() {
+ return socket.isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ socket.close();
+ }
+
+ @Override
+ public String getRemoteAddressString() {
+ return socket.getRemoteSocketAddress().toString();
+ }
+
+ @Override
+ public String getLocalAddressString() {
+ return socket.getLocalSocketAddress().toString();
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return in;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
+ public String toString() {
+ return "BasicInetPeer(" + socket.toString() + ")";
+ }
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return null;
+ }
+
+ @Override
+ public boolean hasSecureChannel() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
new file mode 100644
index 0000000..da660c7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.net.unix.DomainSocket;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using an encrypted
+ * communications medium.
+ */
+@InterfaceAudience.Private
+public class EncryptedPeer implements Peer {
+ private final Peer enclosedPeer;
+
+ /**
+ * An encrypted InputStream.
+ */
+ private final InputStream in;
+
+ /**
+ * An encrypted OutputStream.
+ */
+ private final OutputStream out;
+
+ /**
+ * An encrypted ReadableByteChannel.
+ */
+ private final ReadableByteChannel channel;
+
+ public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) {
+ this.enclosedPeer = enclosedPeer;
+ this.in = ios.in;
+ this.out = ios.out;
+ this.channel = ios.in instanceof ReadableByteChannel ?
+ (ReadableByteChannel)ios.in : null;
+ }
+
+ @Override
+ public ReadableByteChannel getInputStreamChannel() {
+ return channel;
+ }
+
+ @Override
+ public void setReadTimeout(int timeoutMs) throws IOException {
+ enclosedPeer.setReadTimeout(timeoutMs);
+ }
+
+ @Override
+ public int getReceiveBufferSize() throws IOException {
+ return enclosedPeer.getReceiveBufferSize();
+ }
+
+ @Override
+ public boolean getTcpNoDelay() throws IOException {
+ return enclosedPeer.getTcpNoDelay();
+ }
+
+ @Override
+ public void setWriteTimeout(int timeoutMs) throws IOException {
+ enclosedPeer.setWriteTimeout(timeoutMs);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return enclosedPeer.isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ in.close();
+ } finally {
+ try {
+ out.close();
+ } finally {
+ enclosedPeer.close();
+ }
+ }
+ }
+
+ @Override
+ public String getRemoteAddressString() {
+ return enclosedPeer.getRemoteAddressString();
+ }
+
+ @Override
+ public String getLocalAddressString() {
+ return enclosedPeer.getLocalAddressString();
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return in;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return enclosedPeer.isLocal();
+ }
+
+ @Override
+ public String toString() {
+ return "EncryptedPeer(" + enclosedPeer + ")";
+ }
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return enclosedPeer.getDomainSocket();
+ }
+
+ @Override
+ public boolean hasSecureChannel() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
new file mode 100644
index 0000000..a12a69b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.net.unix.DomainSocket;
+
+/**
+ * Represents a peer that we communicate with by using non-blocking I/O
+ * on a Socket.
+ */
+public class NioInetPeer implements Peer {
+ private final Socket socket;
+
+ /**
+ * An InputStream which simulates blocking I/O with timeouts using NIO.
+ */
+ private final SocketInputStream in;
+
+ /**
+ * An OutputStream which simulates blocking I/O with timeouts using NIO.
+ */
+ private final SocketOutputStream out;
+
+ private final boolean isLocal;
+
+ public NioInetPeer(Socket socket) throws IOException {
+ this.socket = socket;
+ this.in = new SocketInputStream(socket.getChannel(), 0);
+ this.out = new SocketOutputStream(socket.getChannel(), 0);
+ this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
+ }
+
+ @Override
+ public ReadableByteChannel getInputStreamChannel() {
+ return in;
+ }
+
+ @Override
+ public void setReadTimeout(int timeoutMs) throws IOException {
+ in.setTimeout(timeoutMs);
+ }
+
+ @Override
+ public int getReceiveBufferSize() throws IOException {
+ return socket.getReceiveBufferSize();
+ }
+
+ @Override
+ public boolean getTcpNoDelay() throws IOException {
+ return socket.getTcpNoDelay();
+ }
+
+ @Override
+ public void setWriteTimeout(int timeoutMs) throws IOException {
+ out.setTimeout(timeoutMs);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return socket.isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // We always close the outermost streams-- in this case, 'in' and 'out'
+ // Closing either one of these will also close the Socket.
+ try {
+ in.close();
+ } finally {
+ out.close();
+ }
+ }
+
+ @Override
+ public String getRemoteAddressString() {
+ return socket.getRemoteSocketAddress().toString();
+ }
+
+ @Override
+ public String getLocalAddressString() {
+ return socket.getLocalSocketAddress().toString();
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return in;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
+ public String toString() {
+ return "NioInetPeer(" + socket.toString() + ")";
+ }
+
+ @Override
+ public DomainSocket getDomainSocket() {
+ return null;
+ }
+
+ @Override
+ public boolean hasSecureChannel() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
index 42cf287..3c38d5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
@@ -57,8 +57,8 @@ public interface Peer extends Closeable {
* Set the write timeout on this peer.
*
* Note: this is not honored for BasicInetPeer.
- * See {@link BasicSocketPeer#setWriteTimeout} for details.
- *
+ * See {@link BasicInetPeer#setWriteTimeout} for details.
+ *
* @param timeoutMs The timeout in milliseconds.
*/
public void setWriteTimeout(int timeoutMs) throws IOException;
@@ -76,13 +76,13 @@ public interface Peer extends Closeable {
public void close() throws IOException;
/**
- * @return A string representing the remote end of our
+ * @return A string representing the remote end of our
* connection to the peer.
*/
public String getRemoteAddressString();
/**
- * @return A string representing the local end of our
+ * @return A string representing the local end of our
* connection to the peer.
*/
public String getLocalAddressString();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
new file mode 100644
index 0000000..23407f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A little struct class to wrap an InputStream and an OutputStream.
+ */
+@InterfaceAudience.Private
+public class IOStreamPair {
+ public final InputStream in;
+ public final OutputStream out;
+
+ public IOStreamPair(InputStream in, OutputStream out) {
+ this.in = in;
+ this.out = out;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
new file mode 100644
index 0000000..3846f4a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.net.InetAddress;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Class used to indicate whether a channel is trusted or not.
+ * The default implementation is to return false indicating that
+ * the channel is not trusted.
+ * This class can be overridden to provide custom logic to determine
+ * whether a channel is trusted or not.
+ * The custom class can be specified via configuration.
+ *
+ */
+public class TrustedChannelResolver implements Configurable {
+ Configuration conf;
+
+ /**
+ * Returns an instance of TrustedChannelResolver.
+ * Looks up the configuration to see if there is custom class specified.
+ * @param conf
+ * @return TrustedChannelResolver
+ */
+ public static TrustedChannelResolver getInstance(Configuration conf) {
+ Class<? extends TrustedChannelResolver> clazz =
+ conf.getClass(
+ HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS,
+ TrustedChannelResolver.class, TrustedChannelResolver.class);
+ return ReflectionUtils.newInstance(clazz, conf);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Return boolean value indicating whether a channel is trusted or not
+ * from a client's perspective.
+ * @return true if the channel is trusted and false otherwise.
+ */
+ public boolean isTrusted() {
+ return false;
+ }
+
+
+ /**
+ * Identify boolean value indicating whether a channel is trusted or not.
+ * @param peerAddress address of the peer
+ * @return true if the channel is trusted and false otherwise.
+ */
+ public boolean isTrusted(InetAddress peerAddress) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
new file mode 100644
index 0000000..959cba0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+
+/**
+ * Creates a new {@link DataEncryptionKey} on demand.
+ */
+@InterfaceAudience.Private
+public interface DataEncryptionKeyFactory {
+
+ /**
+ * Creates a new DataEncryptionKey.
+ *
+ * @return DataEncryptionKey newly created
+ * @throws IOException for any error
+ */
+ DataEncryptionKey newDataEncryptionKey() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
new file mode 100644
index 0000000..256caff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -0,0 +1,519 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.net.InetAddresses;
+import com.google.protobuf.ByteString;
+
+/**
+ * Utility methods implementing SASL negotiation for DataTransferProtocol.
+ */
+@InterfaceAudience.Private
+public final class DataTransferSaslUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ DataTransferSaslUtil.class);
+
+ /**
+ * Delimiter for the three-part SASL username string.
+ */
+ public static final String NAME_DELIMITER = " ";
+
+ /**
+ * Sent by clients and validated by servers. We use a number that's unlikely
+ * to ever be sent as the value of the DATA_TRANSFER_VERSION.
+ */
+ public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+
+ /**
+ * Checks that SASL negotiation has completed for the given participant, and
+ * the negotiated quality of protection is included in the given SASL
+ * properties and therefore acceptable.
+ *
+ * @param sasl participant to check
+ * @param saslProps properties of SASL negotiation
+ * @throws IOException for any error
+ */
+ public static void checkSaslComplete(SaslParticipant sasl,
+ Map<String, String> saslProps) throws IOException {
+ if (!sasl.isComplete()) {
+ throw new IOException("Failed to complete SASL handshake");
+ }
+ Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
+ saslProps.get(Sasl.QOP).split(",")));
+ String negotiatedQop = sasl.getNegotiatedQop();
+ LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}",
+ requestedQop, negotiatedQop);
+ if (!requestedQop.contains(negotiatedQop)) {
+ throw new IOException(String.format("SASL handshake completed, but " +
+ "channel does not have acceptable quality of protection, " +
+ "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
+ }
+ }
+
+ /**
+ * Check whether requested SASL Qop contains privacy.
+ *
+ * @param saslProps properties of SASL negotiation
+ * @return boolean true if privacy exists
+ */
+ public static boolean requestedQopContainsPrivacy(
+ Map<String, String> saslProps) {
+ Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
+ saslProps.get(Sasl.QOP).split(",")));
+ return requestedQop.contains("auth-conf");
+ }
+
+ /**
+ * Creates SASL properties required for an encrypted SASL negotiation.
+ *
+ * @param encryptionAlgorithm to use for SASL negotation
+ * @return properties of encrypted SASL negotiation
+ */
+ public static Map<String, String> createSaslPropertiesForEncryption(
+ String encryptionAlgorithm) {
+ Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+ saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+ saslProps.put(Sasl.SERVER_AUTH, "true");
+ saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+ return saslProps;
+ }
+
+ /**
+ * For an encrypted SASL negotiation, encodes an encryption key to a SASL
+ * password.
+ *
+ * @param encryptionKey to encode
+ * @return key encoded as SASL password
+ */
+ public static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+ return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8)
+ .toCharArray();
+ }
+
+ /**
+ * Returns InetAddress from peer. The getRemoteAddressString has the form
+ * [host][/ip-address]:port. The host may be missing. The IP address (and
+ * preceding '/') may be missing. The port preceded by ':' is always present.
+ *
+ * @param peer
+ * @return InetAddress from peer
+ */
+ public static InetAddress getPeerAddress(Peer peer) {
+ String remoteAddr = peer.getRemoteAddressString().split(":")[0];
+ int slashIdx = remoteAddr.indexOf('/');
+ return InetAddresses.forString(slashIdx != -1 ?
+ remoteAddr.substring(slashIdx + 1, remoteAddr.length()) :
+ remoteAddr);
+ }
+
+ /**
+ * Creates a SaslPropertiesResolver from the given configuration. This method
+ * works by cloning the configuration, translating configuration properties
+ * specific to DataTransferProtocol to what SaslPropertiesResolver expects,
+ * and then delegating to SaslPropertiesResolver for initialization. This
+ * method returns null if SASL protection has not been configured for
+ * DataTransferProtocol.
+ *
+ * @param conf configuration to read
+ * @return SaslPropertiesResolver for DataTransferProtocol, or null if not
+ * configured
+ */
+ public static SaslPropertiesResolver getSaslPropertiesResolver(
+ Configuration conf) {
+ String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+ if (qops == null || qops.isEmpty()) {
+ LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " +
+ "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY);
+ return null;
+ }
+ Configuration saslPropsResolverConf = new Configuration(conf);
+ saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops);
+ Class<? extends SaslPropertiesResolver> resolverClass = conf.getClass(
+ HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+ SaslPropertiesResolver.class, SaslPropertiesResolver.class);
+ resolverClass = conf.getClass(DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY,
+ resolverClass, SaslPropertiesResolver.class);
+ saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+ resolverClass, SaslPropertiesResolver.class);
+ SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance(
+ saslPropsResolverConf);
+ LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " +
+ "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops,
+ DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass);
+ return resolver;
+ }
+
+ /**
+ * Reads a SASL negotiation message.
+ *
+ * @param in stream to read
+ * @return bytes of SASL negotiation messsage
+ * @throws IOException for any error
+ */
+ public static byte[] readSaslMessage(InputStream in) throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ return proto.getPayload().toByteArray();
+ }
+ }
+
+ /**
+ * Reads a SASL negotiation message and negotiation cipher options.
+ *
+ * @param in stream to read
+ * @param cipherOptions list to store negotiation cipher options
+ * @return byte[] SASL negotiation message
+ * @throws IOException for any error
+ */
+ public static byte[] readSaslMessageAndNegotiationCipherOptions(
+ InputStream in, List<CipherOption> cipherOptions) throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
+ if (optionProtos != null) {
+ for (CipherOptionProto optionProto : optionProtos) {
+ cipherOptions.add(PBHelperClient.convert(optionProto));
+ }
+ }
+ return proto.getPayload().toByteArray();
+ }
+ }
+
+ /**
+ * Negotiate a cipher option which server supports.
+ *
+ * @param conf the configuration
+ * @param options the cipher options which client supports
+ * @return CipherOption negotiated cipher option
+ */
+ public static CipherOption negotiateCipherOption(Configuration conf,
+ List<CipherOption> options) throws IOException {
+ // Negotiate cipher suites if configured. Currently, the only supported
+ // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+ // values for future expansion.
+ String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+ if (cipherSuites == null || cipherSuites.isEmpty()) {
+ return null;
+ }
+ if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+ throw new IOException(String.format("Invalid cipher suite, %s=%s",
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+ }
+ if (options != null) {
+ for (CipherOption option : options) {
+ CipherSuite suite = option.getCipherSuite();
+ if (suite == CipherSuite.AES_CTR_NOPADDING) {
+ int keyLen = conf.getInt(
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY,
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8;
+ CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
+ byte[] inKey = new byte[keyLen];
+ byte[] inIv = new byte[suite.getAlgorithmBlockSize()];
+ byte[] outKey = new byte[keyLen];
+ byte[] outIv = new byte[suite.getAlgorithmBlockSize()];
+ codec.generateSecureRandom(inKey);
+ codec.generateSecureRandom(inIv);
+ codec.generateSecureRandom(outKey);
+ codec.generateSecureRandom(outIv);
+ return new CipherOption(suite, inKey, inIv, outKey, outIv);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Send SASL message and negotiated cipher option to client.
+ *
+ * @param out stream to receive message
+ * @param payload to send
+ * @param option negotiated cipher option
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessageAndNegotiatedCipherOption(
+ OutputStream out, byte[] payload, CipherOption option)
+ throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+
+ builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+ if (payload != null) {
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (option != null) {
+ builder.addCipherOption(PBHelperClient.convert(option));
+ }
+
+ DataTransferEncryptorMessageProto proto = builder.build();
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ /**
+ * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream}
+ * and {@link org.apache.hadoop.crypto.CryptoOutputStream}
+ *
+ * @param conf the configuration
+ * @param cipherOption negotiated cipher option
+ * @param out underlying output stream
+ * @param in underlying input stream
+ * @param isServer is server side
+ * @return IOStreamPair the stream pair
+ * @throws IOException for any error
+ */
+ public static IOStreamPair createStreamPair(Configuration conf,
+ CipherOption cipherOption, OutputStream out, InputStream in,
+ boolean isServer) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
+ "CryptoOutputStream.");
+ }
+ CryptoCodec codec = CryptoCodec.getInstance(conf,
+ cipherOption.getCipherSuite());
+ byte[] inKey = cipherOption.getInKey();
+ byte[] inIv = cipherOption.getInIv();
+ byte[] outKey = cipherOption.getOutKey();
+ byte[] outIv = cipherOption.getOutIv();
+ InputStream cIn = new CryptoInputStream(in, codec,
+ isServer ? inKey : outKey, isServer ? inIv : outIv);
+ OutputStream cOut = new CryptoOutputStream(out, codec,
+ isServer ? outKey : inKey, isServer ? outIv : inIv);
+ return new IOStreamPair(cIn, cOut);
+ }
+
+ /**
+ * Sends a SASL negotiation message indicating an error.
+ *
+ * @param out stream to receive message
+ * @param message to send
+ * @throws IOException for any error
+ */
+ public static void sendGenericSaslErrorMessage(OutputStream out,
+ String message) throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
+ }
+
+ /**
+ * Sends a SASL negotiation message.
+ *
+ * @param out stream to receive message
+ * @param payload to send
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessage(OutputStream out, byte[] payload)
+ throws IOException {
+ sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
+ }
+
+ /**
+ * Send a SASL negotiation message and negotiation cipher options to server.
+ *
+ * @param out stream to receive message
+ * @param payload to send
+ * @param options cipher options to negotiate
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessageAndNegotiationCipherOptions(
+ OutputStream out, byte[] payload, List<CipherOption> options)
+ throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+
+ builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+ if (payload != null) {
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (options != null) {
+ builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
+ }
+
+ DataTransferEncryptorMessageProto proto = builder.build();
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ /**
+ * Read SASL message and negotiated cipher option from server.
+ *
+ * @param in stream to read
+ * @return SaslResponseWithNegotiatedCipherOption SASL message and
+ * negotiated cipher option
+ * @throws IOException for any error
+ */
+ public static SaslResponseWithNegotiatedCipherOption
+ readSaslMessageAndNegotiatedCipherOption(InputStream in)
+ throws IOException {
+ DataTransferEncryptorMessageProto proto =
+ DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ } else {
+ byte[] response = proto.getPayload().toByteArray();
+ List<CipherOption> options = PBHelperClient.convertCipherOptionProtos(
+ proto.getCipherOptionList());
+ CipherOption option = null;
+ if (options != null && !options.isEmpty()) {
+ option = options.get(0);
+ }
+ return new SaslResponseWithNegotiatedCipherOption(response, option);
+ }
+ }
+
+ /**
+ * Encrypt the key and iv of the negotiated cipher option.
+ *
+ * @param option negotiated cipher option
+ * @param sasl SASL participant representing server
+ * @return CipherOption negotiated cipher option which contains the
+ * encrypted key and iv
+ * @throws IOException for any error
+ */
+ public static CipherOption wrap(CipherOption option, SaslParticipant sasl)
+ throws IOException {
+ if (option != null) {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = sasl.wrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = sasl.wrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
+ outKey, option.getOutIv());
+ }
+
+ return null;
+ }
+
+ /**
+ * Decrypt the key and iv of the negotiated cipher option.
+ *
+ * @param option negotiated cipher option
+ * @param sasl SASL participant representing client
+ * @return CipherOption negotiated cipher option which contains the
+ * decrypted key and iv
+ * @throws IOException for any error
+ */
+ public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
+ throws IOException {
+ if (option != null) {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = sasl.unwrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = sasl.unwrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
+ outKey, option.getOutIv());
+ }
+
+ return null;
+ }
+
+ /**
+ * Sends a SASL negotiation message.
+ *
+ * @param out stream to receive message
+ * @param status negotiation status
+ * @param payload to send
+ * @param message to send
+ * @throws IOException for any error
+ */
+ public static void sendSaslMessage(OutputStream out,
+ DataTransferEncryptorStatus status, byte[] payload, String message)
+ throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+
+ builder.setStatus(status);
+ if (payload != null) {
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (message != null) {
+ builder.setMessage(message);
+ }
+
+ DataTransferEncryptorMessageProto proto = builder.build();
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ /**
+ * There is no reason to instantiate this class.
+ */
+ private DataTransferSaslUtil() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
new file mode 100644
index 0000000..913203c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+/**
+ * Negotiates SASL for DataTransferProtocol on behalf of a client. There are
+ * two possible supported variants of SASL negotiation: either a general-purpose
+ * negotiation supporting any quality of protection, or a specialized
+ * negotiation that enforces privacy as the quality of protection using a
+ * cryptographically strong encryption key.
+ *
+ * This class is used in both the HDFS client and the DataNode. The DataNode
+ * needs it, because it acts as a client to other DataNodes during write
+ * pipelines and block transfers.
+ */
+@InterfaceAudience.Private
+public class SaslDataTransferClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SaslDataTransferClient.class);
+
+ private final Configuration conf;
+ private final AtomicBoolean fallbackToSimpleAuth;
+ private final SaslPropertiesResolver saslPropsResolver;
+ private final TrustedChannelResolver trustedChannelResolver;
+
+ /**
+ * Creates a new SaslDataTransferClient. This constructor is used in cases
+ * where it is not relevant to track if a secure client did a fallback to
+ * simple auth. For intra-cluster connections between data nodes in the same
+ * cluster, we can assume that all run under the same security configuration.
+ *
+ * @param conf the configuration
+ * @param saslPropsResolver for determining properties of SASL negotiation
+ * @param trustedChannelResolver for identifying trusted connections that do
+ * not require SASL negotiation
+ */
+ public SaslDataTransferClient(Configuration conf,
+ SaslPropertiesResolver saslPropsResolver,
+ TrustedChannelResolver trustedChannelResolver) {
+ this(conf, saslPropsResolver, trustedChannelResolver, null);
+ }
+
+ /**
+ * Creates a new SaslDataTransferClient.
+ *
+ * @param conf the configuration
+ * @param saslPropsResolver for determining properties of SASL negotiation
+ * @param trustedChannelResolver for identifying trusted connections that do
+ * not require SASL negotiation
+ * @param fallbackToSimpleAuth checked on each attempt at general SASL
+ * handshake, if true forces use of simple auth
+ */
+ public SaslDataTransferClient(Configuration conf,
+ SaslPropertiesResolver saslPropsResolver,
+ TrustedChannelResolver trustedChannelResolver,
+ AtomicBoolean fallbackToSimpleAuth) {
+ this.conf = conf;
+ this.fallbackToSimpleAuth = fallbackToSimpleAuth;
+ this.saslPropsResolver = saslPropsResolver;
+ this.trustedChannelResolver = trustedChannelResolver;
+ }
+
+ /**
+ * Sends client SASL negotiation for a newly allocated socket if required.
+ *
+ * @param socket connection socket
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ throws IOException {
+ // The encryption key factory only returns a key if encryption is enabled.
+ DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
+ encryptionKeyFactory.newDataEncryptionKey() : null;
+ IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
+ underlyingIn, encryptionKey, accessToken, datanodeId);
+ return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
+ }
+
+ /**
+ * Sends client SASL negotiation for a peer if required.
+ *
+ * @param peer connection peer
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ throws IOException {
+ IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer),
+ peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory,
+ accessToken, datanodeId);
+ // TODO: Consider renaming EncryptedPeer to SaslPeer.
+ return ios != null ? new EncryptedPeer(peer, ios) : peer;
+ }
+
+ /**
+ * Sends client SASL negotiation for a socket if required.
+ *
+ * @param socket connection socket
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ throws IOException {
+ IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
+ underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
+ return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
+ }
+
+ /**
+ * Checks if an address is already trusted and then sends client SASL
+ * negotiation if required.
+ *
+ * @param addr connection address
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKeyFactory for creation of an encryption key
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair checkTrustAndSend(InetAddress addr,
+ OutputStream underlyingOut, InputStream underlyingIn,
+ DataEncryptionKeyFactory encryptionKeyFactory,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ throws IOException {
+ if (!trustedChannelResolver.isTrusted() &&
+ !trustedChannelResolver.isTrusted(addr)) {
+ // The encryption key factory only returns a key if encryption is enabled.
+ DataEncryptionKey encryptionKey =
+ encryptionKeyFactory.newDataEncryptionKey();
+ return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
+ datanodeId);
+ } else {
+ LOG.debug(
+ "SASL client skipping handshake on trusted connection for addr = {}, "
+ + "datanodeId = {}", addr, datanodeId);
+ return null;
+ }
+ }
+
+ /**
+ * Sends client SASL negotiation if required. Determines the correct type of
+ * SASL handshake based on configuration.
+ *
+ * @param addr connection address
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKey for an encrypted SASL handshake
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKey encryptionKey,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ throws IOException {
+ if (encryptionKey != null) {
+ LOG.debug(
+ "SASL client doing encrypted handshake for addr = {}, datanodeId = {}",
+ addr, datanodeId);
+ return getEncryptedStreams(underlyingOut, underlyingIn,
+ encryptionKey);
+ } else if (!UserGroupInformation.isSecurityEnabled()) {
+ LOG.debug(
+ "SASL client skipping handshake in unsecured configuration for "
+ + "addr = {}, datanodeId = {}", addr, datanodeId);
+ return null;
+ } else if (SecurityUtil.isPrivilegedPort(datanodeId.getXferPort())) {
+ LOG.debug(
+ "SASL client skipping handshake in secured configuration with "
+ + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
+ return null;
+ } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+ LOG.debug(
+ "SASL client skipping handshake in secured configuration with "
+ + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
+ return null;
+ } else if (saslPropsResolver != null) {
+ LOG.debug(
+ "SASL client doing general handshake for addr = {}, datanodeId = {}",
+ addr, datanodeId);
+ return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
+ datanodeId);
+ } else {
+ // It's a secured cluster using non-privileged ports, but no SASL. The
+ // only way this can happen is if the DataNode has
+ // ignore.secure.ports.for.testing configured, so this is a rare edge case.
+ LOG.debug(
+ "SASL client skipping handshake in secured configuration with no SASL "
+ + "protection configured for addr = {}, datanodeId = {}",
+ addr, datanodeId);
+ return null;
+ }
+ }
+
+ /**
+ * Sends client SASL negotiation for specialized encrypted handshake.
+ *
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param encryptionKey for an encrypted SASL handshake
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair getEncryptedStreams(OutputStream underlyingOut,
+ InputStream underlyingIn, DataEncryptionKey encryptionKey)
+ throws IOException {
+ Map<String, String> saslProps = createSaslPropertiesForEncryption(
+ encryptionKey.encryptionAlgorithm);
+
+ LOG.debug("Client using encryption algorithm {}",
+ encryptionKey.encryptionAlgorithm);
+
+ String userName = getUserNameFromEncryptionKey(encryptionKey);
+ char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey);
+ CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
+ password);
+ return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
+ callbackHandler);
+ }
+
+ /**
+ * The SASL username for an encrypted handshake consists of the keyId,
+ * blockPoolId, and nonce with the first two encoded as Strings, and the third
+ * encoded using Base64. The fields are each separated by a single space.
+ *
+ * @param encryptionKey the encryption key to encode as a SASL username.
+ * @return encoded username containing keyId, blockPoolId, and nonce
+ */
+ private static String getUserNameFromEncryptionKey(
+ DataEncryptionKey encryptionKey) {
+ return encryptionKey.keyId + NAME_DELIMITER +
+ encryptionKey.blockPoolId + NAME_DELIMITER +
+ new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+ }
+
+ /**
+ * Sets user name and password when asked by the client-side SASL object.
+ */
+ private static final class SaslClientCallbackHandler
+ implements CallbackHandler {
+
+ private final char[] password;
+ private final String userName;
+
+ /**
+ * Creates a new SaslClientCallbackHandler.
+ *
+ * @param userName SASL user name
+ * @Param password SASL password
+ */
+ public SaslClientCallbackHandler(String userName, char[] password) {
+ this.password = password;
+ this.userName = userName;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ pc.setPassword(password);
+ }
+ if (rc != null) {
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+
+ /**
+ * Sends client SASL negotiation for general-purpose handshake.
+ *
+ * @param addr connection address
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param accessToken connection block access token
+ * @param datanodeId ID of destination DataNode
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair getSaslStreams(InetAddress addr,
+ OutputStream underlyingOut, InputStream underlyingIn,
+ Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+ throws IOException {
+ Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
+
+ String userName = buildUserName(accessToken);
+ char[] password = buildClientPassword(accessToken);
+ CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
+ password);
+ return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
+ callbackHandler);
+ }
+
+ /**
+ * Builds the client's user name for the general-purpose handshake, consisting
+ * of the base64-encoded serialized block access token identifier. Note that
+ * this includes only the token identifier, not the token itself, which would
+ * include the password. The password is a shared secret, and we must not
+ * write it on the network during the SASL authentication exchange.
+ *
+ * @param blockToken for block access
+ * @return SASL user name
+ */
+ private static String buildUserName(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getIdentifier(), false),
+ Charsets.UTF_8);
+ }
+
+ /**
+ * Calculates the password on the client side for the general-purpose
+ * handshake. The password consists of the block access token's password.
+ *
+ * @param blockToken for block access
+ * @return SASL password
+ */
+ private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getPassword(), false),
+ Charsets.UTF_8).toCharArray();
+ }
+
+ /**
+ * This method actually executes the client-side SASL handshake.
+ *
+ * @param underlyingOut connection output stream
+ * @param underlyingIn connection input stream
+ * @param userName SASL user name
+ * @param saslProps properties of SASL negotiation
+ * @param callbackHandler for responding to SASL callbacks
+ * @return new pair of streams, wrapped after SASL negotiation
+ * @throws IOException for any error
+ */
+ private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
+ InputStream underlyingIn, String userName, Map<String, String> saslProps,
+ CallbackHandler callbackHandler) throws IOException {
+
+ DataOutputStream out = new DataOutputStream(underlyingOut);
+ DataInputStream in = new DataInputStream(underlyingIn);
+
+ SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName,
+ saslProps, callbackHandler);
+
+ out.writeInt(SASL_TRANSFER_MAGIC_NUMBER);
+ out.flush();
+
+ try {
+ // Start of handshake - "initial response" in SASL terminology.
+ sendSaslMessage(out, new byte[0]);
+
+ // step 1
+ byte[] remoteResponse = readSaslMessage(in);
+ byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+ List<CipherOption> cipherOptions = null;
+ if (requestedQopContainsPrivacy(saslProps)) {
+ // Negotiate cipher suites if configured. Currently, the only supported
+ // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+ // values for future expansion.
+ String cipherSuites = conf.get(
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+ if (cipherSuites != null && !cipherSuites.isEmpty()) {
+ if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+ throw new IOException(String.format("Invalid cipher suite, %s=%s",
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+ }
+ CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
+ cipherOptions = Lists.newArrayListWithCapacity(1);
+ cipherOptions.add(option);
+ }
+ }
+ sendSaslMessageAndNegotiationCipherOptions(out, localResponse,
+ cipherOptions);
+
+ // step 2 (client-side only)
+ SaslResponseWithNegotiatedCipherOption response =
+ readSaslMessageAndNegotiatedCipherOption(in);
+ localResponse = sasl.evaluateChallengeOrResponse(response.payload);
+ assert localResponse == null;
+
+ // SASL handshake is complete
+ checkSaslComplete(sasl, saslProps);
+
+ CipherOption cipherOption = null;
+ if (sasl.isNegotiatedQopPrivacy()) {
+ // Unwrap the negotiated cipher option
+ cipherOption = unwrap(response.cipherOption, sasl);
+ }
+
+ // If negotiated cipher option is not null, we will use it to create
+ // stream pair.
+ return cipherOption != null ? createStreamPair(
+ conf, cipherOption, underlyingOut, underlyingIn, false) :
+ sasl.createStreamPair(out, in);
+ } catch (IOException ioe) {
+ sendGenericSaslErrorMessage(out, ioe.getMessage());
+ throw ioe;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
new file mode 100644
index 0000000..f14a075
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Map;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+
+/**
+ * Strongly inspired by Thrift's TSaslTransport class.
+ *
+ * Used to abstract over the <code>SaslServer</code> and
+ * <code>SaslClient</code> classes, which share a lot of their interface, but
+ * unfortunately don't share a common superclass.
+ */
+@InterfaceAudience.Private
+class SaslParticipant {
+
+ // This has to be set as part of the SASL spec, but it don't matter for
+ // our purposes, but may not be empty. It's sent over the wire, so use
+ // a short string.
+ private static final String SERVER_NAME = "0";
+ private static final String PROTOCOL = "hdfs";
+ private static final String MECHANISM = "DIGEST-MD5";
+
+ // One of these will always be null.
+ private final SaslServer saslServer;
+ private final SaslClient saslClient;
+
+ /**
+ * Creates a SaslParticipant wrapping a SaslServer.
+ *
+ * @param saslProps properties of SASL negotiation
+ * @param callbackHandler for handling all SASL callbacks
+ * @return SaslParticipant wrapping SaslServer
+ * @throws SaslException for any error
+ */
+ public static SaslParticipant createServerSaslParticipant(
+ Map<String, String> saslProps, CallbackHandler callbackHandler)
+ throws SaslException {
+ return new SaslParticipant(Sasl.createSaslServer(MECHANISM,
+ PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
+ }
+
+ /**
+ * Creates a SaslParticipant wrapping a SaslClient.
+ *
+ * @param userName SASL user name
+ * @param saslProps properties of SASL negotiation
+ * @param callbackHandler for handling all SASL callbacks
+ * @return SaslParticipant wrapping SaslClient
+ * @throws SaslException for any error
+ */
+ public static SaslParticipant createClientSaslParticipant(String userName,
+ Map<String, String> saslProps, CallbackHandler callbackHandler)
+ throws SaslException {
+ return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM },
+ userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
+ }
+
+ /**
+ * Private constructor wrapping a SaslServer.
+ *
+ * @param saslServer to wrap
+ */
+ private SaslParticipant(SaslServer saslServer) {
+ this.saslServer = saslServer;
+ this.saslClient = null;
+ }
+
+ /**
+ * Private constructor wrapping a SaslClient.
+ *
+ * @param saslClient to wrap
+ */
+ private SaslParticipant(SaslClient saslClient) {
+ this.saslServer = null;
+ this.saslClient = saslClient;
+ }
+
+ /**
+ * @see {@link SaslServer#evaluateResponse}
+ * @see {@link SaslClient#evaluateChallenge}
+ */
+ public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse)
+ throws SaslException {
+ if (saslClient != null) {
+ return saslClient.evaluateChallenge(challengeOrResponse);
+ } else {
+ return saslServer.evaluateResponse(challengeOrResponse);
+ }
+ }
+
+ /**
+ * After successful SASL negotation, returns the negotiated quality of
+ * protection.
+ *
+ * @return negotiated quality of protection
+ */
+ public String getNegotiatedQop() {
+ if (saslClient != null) {
+ return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ } else {
+ return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ }
+ }
+
+ /**
+ * After successful SASL negotiation, returns whether it's QOP privacy
+ *
+ * @return boolean whether it's QOP privacy
+ */
+ public boolean isNegotiatedQopPrivacy() {
+ String qop = getNegotiatedQop();
+ return qop != null && "auth-conf".equalsIgnoreCase(qop);
+ }
+
+ /**
+ * Wraps a byte array.
+ *
+ * @param bytes The array containing the bytes to wrap.
+ * @param off The starting position at the array
+ * @param len The number of bytes to wrap
+ * @return byte[] wrapped bytes
+ * @throws SaslException if the bytes cannot be successfully wrapped
+ */
+ public byte[] wrap(byte[] bytes, int off, int len) throws SaslException {
+ if (saslClient != null) {
+ return saslClient.wrap(bytes, off, len);
+ } else {
+ return saslServer.wrap(bytes, off, len);
+ }
+ }
+
+ /**
+ * Unwraps a byte array.
+ *
+ * @param bytes The array containing the bytes to unwrap.
+ * @param off The starting position at the array
+ * @param len The number of bytes to unwrap
+ * @return byte[] unwrapped bytes
+ * @throws SaslException if the bytes cannot be successfully unwrapped
+ */
+ public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException {
+ if (saslClient != null) {
+ return saslClient.unwrap(bytes, off, len);
+ } else {
+ return saslServer.unwrap(bytes, off, len);
+ }
+ }
+
+ /**
+ * Returns true if SASL negotiation is complete.
+ *
+ * @return true if SASL negotiation is complete
+ */
+ public boolean isComplete() {
+ if (saslClient != null) {
+ return saslClient.isComplete();
+ } else {
+ return saslServer.isComplete();
+ }
+ }
+
+ /**
+ * Return some input/output streams that may henceforth have their
+ * communication encrypted, depending on the negotiated quality of protection.
+ *
+ * @param out output stream to wrap
+ * @param in input stream to wrap
+ * @return IOStreamPair wrapping the streams
+ */
+ public IOStreamPair createStreamPair(DataOutputStream out,
+ DataInputStream in) {
+ if (saslClient != null) {
+ return new IOStreamPair(
+ new SaslInputStream(in, saslClient),
+ new SaslOutputStream(out, saslClient));
+ } else {
+ return new IOStreamPair(
+ new SaslInputStream(in, saslServer),
+ new SaslOutputStream(out, saslServer));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
new file mode 100644
index 0000000..f69441b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CipherOption;
+
+@InterfaceAudience.Private
+public class SaslResponseWithNegotiatedCipherOption {
+ final byte[] payload;
+ final CipherOption cipherOption;
+
+ public SaslResponseWithNegotiatedCipherOption(byte[] payload,
+ CipherOption cipherOption) {
+ this.payload = payload;
+ this.cipherOption = cipherOption;
+ }
+}
\ No newline at end of file