You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:42 UTC
[37/50] [abbrv] hadoop git commit: HDFS-9002. Move
o.a.h.hdfs.net/*Peer classes to hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index d921507..1e561cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolPB;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -264,4 +266,104 @@ public class PBHelperClient {
assert size >= 0;
return new ExactSizeInputStream(input, size);
}
+
+ public static CipherOption convert(HdfsProtos.CipherOptionProto proto) {
+ if (proto != null) {
+ CipherSuite suite = null;
+ if (proto.getSuite() != null) {
+ suite = convert(proto.getSuite());
+ }
+ byte[] inKey = null;
+ if (proto.getInKey() != null) {
+ inKey = proto.getInKey().toByteArray();
+ }
+ byte[] inIv = null;
+ if (proto.getInIv() != null) {
+ inIv = proto.getInIv().toByteArray();
+ }
+ byte[] outKey = null;
+ if (proto.getOutKey() != null) {
+ outKey = proto.getOutKey().toByteArray();
+ }
+ byte[] outIv = null;
+ if (proto.getOutIv() != null) {
+ outIv = proto.getOutIv().toByteArray();
+ }
+ return new CipherOption(suite, inKey, inIv, outKey, outIv);
+ }
+ return null;
+ }
+
+ public static CipherSuite convert(HdfsProtos.CipherSuiteProto proto) {
+ switch (proto) {
+ case AES_CTR_NOPADDING:
+ return CipherSuite.AES_CTR_NOPADDING;
+ default:
+ // Set to UNKNOWN and stash the unknown enum value
+ CipherSuite suite = CipherSuite.UNKNOWN;
+ suite.setUnknownValue(proto.getNumber());
+ return suite;
+ }
+ }
+
+ public static HdfsProtos.CipherOptionProto convert(CipherOption option) {
+ if (option != null) {
+ HdfsProtos.CipherOptionProto.Builder builder = HdfsProtos.CipherOptionProto.
+ newBuilder();
+ if (option.getCipherSuite() != null) {
+ builder.setSuite(convert(option.getCipherSuite()));
+ }
+ if (option.getInKey() != null) {
+ builder.setInKey(ByteString.copyFrom(option.getInKey()));
+ }
+ if (option.getInIv() != null) {
+ builder.setInIv(ByteString.copyFrom(option.getInIv()));
+ }
+ if (option.getOutKey() != null) {
+ builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
+ }
+ if (option.getOutIv() != null) {
+ builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
+ }
+ return builder.build();
+ }
+ return null;
+ }
+
+ public static HdfsProtos.CipherSuiteProto convert(CipherSuite suite) {
+ switch (suite) {
+ case UNKNOWN:
+ return HdfsProtos.CipherSuiteProto.UNKNOWN;
+ case AES_CTR_NOPADDING:
+ return HdfsProtos.CipherSuiteProto.AES_CTR_NOPADDING;
+ default:
+ return null;
+ }
+ }
+
+ public static List<HdfsProtos.CipherOptionProto> convertCipherOptions(
+ List<CipherOption> options) {
+ if (options != null) {
+ List<HdfsProtos.CipherOptionProto> protos =
+ Lists.newArrayListWithCapacity(options.size());
+ for (CipherOption option : options) {
+ protos.add(convert(option));
+ }
+ return protos;
+ }
+ return null;
+ }
+
+ public static List<CipherOption> convertCipherOptionProtos(
+ List<HdfsProtos.CipherOptionProto> protos) {
+ if (protos != null) {
+ List<CipherOption> options =
+ Lists.newArrayListWithCapacity(protos.size());
+ for (HdfsProtos.CipherOptionProto proto : protos) {
+ options.add(convert(proto));
+ }
+ return options;
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 42460ed..ce3fbb4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -883,6 +883,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8890. Allow admin to specify which blockpools the balancer should run
on. (Chris Trezzo via mingma)
+ HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client.
+ (Mingliang Liu via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 268a5b9..95e9ad4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -99,7 +99,6 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -3018,7 +3017,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
try {
sock = socketFactory.createSocket();
NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout);
- peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
+ peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this,
blockToken, datanodeId);
peer.setReadTimeout(socketTimeout);
peer.setWriteTimeout(socketTimeout);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6420b55..84858f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -599,14 +599,28 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Security-related configs
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
- public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength";
- public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
- public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
+ @Deprecated
+ public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY =
+ HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
+ @Deprecated
+ public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT =
+ HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
+ @Deprecated
+ public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
+ HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
- public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
- public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
- public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
- public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
+ @Deprecated
+ public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS =
+ HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS;
+ @Deprecated
+ public static final String DFS_DATA_TRANSFER_PROTECTION_KEY =
+ HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+ @Deprecated
+ public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT =
+ HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
+ @Deprecated
+ public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
+ HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
deleted file mode 100644
index a9f33e7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.hadoop.net.unix.DomainSocket;
-
-/**
- * Represents a peer that we communicate with by using a basic Socket
- * that has no associated Channel.
- *
- */
-class BasicInetPeer implements Peer {
- private final Socket socket;
- private final OutputStream out;
- private final InputStream in;
- private final boolean isLocal;
-
- public BasicInetPeer(Socket socket) throws IOException {
- this.socket = socket;
- this.out = socket.getOutputStream();
- this.in = socket.getInputStream();
- this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
- }
-
- @Override
- public ReadableByteChannel getInputStreamChannel() {
- /*
- * This Socket has no channel, so there's nothing to return here.
- */
- return null;
- }
-
- @Override
- public void setReadTimeout(int timeoutMs) throws IOException {
- socket.setSoTimeout(timeoutMs);
- }
-
- @Override
- public int getReceiveBufferSize() throws IOException {
- return socket.getReceiveBufferSize();
- }
-
- @Override
- public boolean getTcpNoDelay() throws IOException {
- return socket.getTcpNoDelay();
- }
-
- @Override
- public void setWriteTimeout(int timeoutMs) {
- /*
- * We can't implement write timeouts. :(
- *
- * Java provides no facility to set a blocking write timeout on a Socket.
- * You can simulate a blocking write with a timeout by using
- * non-blocking I/O. However, we can't use nio here, because this Socket
- * doesn't have an associated Channel.
- *
- * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for
- * more details.
- */
- }
-
- @Override
- public boolean isClosed() {
- return socket.isClosed();
- }
-
- @Override
- public void close() throws IOException {
- socket.close();
- }
-
- @Override
- public String getRemoteAddressString() {
- return socket.getRemoteSocketAddress().toString();
- }
-
- @Override
- public String getLocalAddressString() {
- return socket.getLocalSocketAddress().toString();
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return in;
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return out;
- }
-
- @Override
- public boolean isLocal() {
- return isLocal;
- }
-
- @Override
- public String toString() {
- return "BasicInetPeer(" + socket.toString() + ")";
- }
-
- @Override
- public DomainSocket getDomainSocket() {
- return null;
- }
-
- @Override
- public boolean hasSecureChannel() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
deleted file mode 100644
index da660c7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.net.unix.DomainSocket;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.ReadableByteChannel;
-
-/**
- * Represents a peer that we communicate with by using an encrypted
- * communications medium.
- */
-@InterfaceAudience.Private
-public class EncryptedPeer implements Peer {
- private final Peer enclosedPeer;
-
- /**
- * An encrypted InputStream.
- */
- private final InputStream in;
-
- /**
- * An encrypted OutputStream.
- */
- private final OutputStream out;
-
- /**
- * An encrypted ReadableByteChannel.
- */
- private final ReadableByteChannel channel;
-
- public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) {
- this.enclosedPeer = enclosedPeer;
- this.in = ios.in;
- this.out = ios.out;
- this.channel = ios.in instanceof ReadableByteChannel ?
- (ReadableByteChannel)ios.in : null;
- }
-
- @Override
- public ReadableByteChannel getInputStreamChannel() {
- return channel;
- }
-
- @Override
- public void setReadTimeout(int timeoutMs) throws IOException {
- enclosedPeer.setReadTimeout(timeoutMs);
- }
-
- @Override
- public int getReceiveBufferSize() throws IOException {
- return enclosedPeer.getReceiveBufferSize();
- }
-
- @Override
- public boolean getTcpNoDelay() throws IOException {
- return enclosedPeer.getTcpNoDelay();
- }
-
- @Override
- public void setWriteTimeout(int timeoutMs) throws IOException {
- enclosedPeer.setWriteTimeout(timeoutMs);
- }
-
- @Override
- public boolean isClosed() {
- return enclosedPeer.isClosed();
- }
-
- @Override
- public void close() throws IOException {
- try {
- in.close();
- } finally {
- try {
- out.close();
- } finally {
- enclosedPeer.close();
- }
- }
- }
-
- @Override
- public String getRemoteAddressString() {
- return enclosedPeer.getRemoteAddressString();
- }
-
- @Override
- public String getLocalAddressString() {
- return enclosedPeer.getLocalAddressString();
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return in;
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return out;
- }
-
- @Override
- public boolean isLocal() {
- return enclosedPeer.isLocal();
- }
-
- @Override
- public String toString() {
- return "EncryptedPeer(" + enclosedPeer + ")";
- }
-
- @Override
- public DomainSocket getDomainSocket() {
- return enclosedPeer.getDomainSocket();
- }
-
- @Override
- public boolean hasSecureChannel() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
deleted file mode 100644
index 5bb4f56..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.hadoop.net.SocketInputStream;
-import org.apache.hadoop.net.SocketOutputStream;
-import org.apache.hadoop.net.unix.DomainSocket;
-
-/**
- * Represents a peer that we communicate with by using non-blocking I/O
- * on a Socket.
- */
-class NioInetPeer implements Peer {
- private final Socket socket;
-
- /**
- * An InputStream which simulates blocking I/O with timeouts using NIO.
- */
- private final SocketInputStream in;
-
- /**
- * An OutputStream which simulates blocking I/O with timeouts using NIO.
- */
- private final SocketOutputStream out;
-
- private final boolean isLocal;
-
- NioInetPeer(Socket socket) throws IOException {
- this.socket = socket;
- this.in = new SocketInputStream(socket.getChannel(), 0);
- this.out = new SocketOutputStream(socket.getChannel(), 0);
- this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
- }
-
- @Override
- public ReadableByteChannel getInputStreamChannel() {
- return in;
- }
-
- @Override
- public void setReadTimeout(int timeoutMs) throws IOException {
- in.setTimeout(timeoutMs);
- }
-
- @Override
- public int getReceiveBufferSize() throws IOException {
- return socket.getReceiveBufferSize();
- }
-
- @Override
- public boolean getTcpNoDelay() throws IOException {
- return socket.getTcpNoDelay();
- }
-
- @Override
- public void setWriteTimeout(int timeoutMs) throws IOException {
- out.setTimeout(timeoutMs);
- }
-
- @Override
- public boolean isClosed() {
- return socket.isClosed();
- }
-
- @Override
- public void close() throws IOException {
- // We always close the outermost streams-- in this case, 'in' and 'out'
- // Closing either one of these will also close the Socket.
- try {
- in.close();
- } finally {
- out.close();
- }
- }
-
- @Override
- public String getRemoteAddressString() {
- return socket.getRemoteSocketAddress().toString();
- }
-
- @Override
- public String getLocalAddressString() {
- return socket.getLocalSocketAddress().toString();
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return in;
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return out;
- }
-
- @Override
- public boolean isLocal() {
- return isLocal;
- }
-
- @Override
- public String toString() {
- return "NioInetPeer(" + socket.toString() + ")";
- }
-
- @Override
- public DomainSocket getDomainSocket() {
- return null;
- }
-
- @Override
- public boolean hasSecureChannel() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
index 2a547e0..e31e46a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
@@ -20,22 +20,15 @@ package org.apache.hadoop.hdfs.net;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.security.token.Token;
@InterfaceAudience.Private
public class TcpPeerServer implements PeerServer {
@@ -43,60 +36,6 @@ public class TcpPeerServer implements PeerServer {
private final ServerSocket serverSocket;
- public static Peer peerFromSocket(Socket socket)
- throws IOException {
- Peer peer = null;
- boolean success = false;
- try {
- // TCP_NODELAY is crucial here because of bad interactions between
- // Nagle's Algorithm and Delayed ACKs. With connection keepalive
- // between the client and DN, the conversation looks like:
- // 1. Client -> DN: Read block X
- // 2. DN -> Client: data for block X
- // 3. Client -> DN: Status OK (successful read)
- // 4. Client -> DN: Read block Y
- // The fact that step #3 and #4 are both in the client->DN direction
- // triggers Nagling. If the DN is using delayed ACKs, this results
- // in a delay of 40ms or more.
- //
- // TCP_NODELAY disables nagling and thus avoids this performance
- // disaster.
- socket.setTcpNoDelay(true);
- SocketChannel channel = socket.getChannel();
- if (channel == null) {
- peer = new BasicInetPeer(socket);
- } else {
- peer = new NioInetPeer(socket);
- }
- success = true;
- return peer;
- } finally {
- if (!success) {
- if (peer != null) peer.close();
- socket.close();
- }
- }
- }
-
- public static Peer peerFromSocketAndKey(
- SaslDataTransferClient saslClient, Socket s,
- DataEncryptionKeyFactory keyFactory,
- Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
- throws IOException {
- Peer peer = null;
- boolean success = false;
- try {
- peer = peerFromSocket(s);
- peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
- success = true;
- return peer;
- } finally {
- if (!success) {
- IOUtils.cleanup(null, peer);
- }
- }
- }
-
/**
* Create a non-secure TcpPeerServer.
*
@@ -136,7 +75,7 @@ public class TcpPeerServer implements PeerServer {
@Override
public Peer accept() throws IOException, SocketTimeoutException {
- Peer peer = peerFromSocket(serverSocket.accept());
+ Peer peer = DFSUtilClient.peerFromSocket(serverSocket.accept());
return peer;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
deleted file mode 100644
index 23407f8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * A little struct class to wrap an InputStream and an OutputStream.
- */
-@InterfaceAudience.Private
-public class IOStreamPair {
- public final InputStream in;
- public final OutputStream out;
-
- public IOStreamPair(InputStream in, OutputStream out) {
- this.in = in;
- this.out = out;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
deleted file mode 100644
index 9e6a43d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.net.InetAddress;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * Class used to indicate whether a channel is trusted or not.
- * The default implementation is to return false indicating that
- * the channel is not trusted.
- * This class can be overridden to provide custom logic to determine
- * whether a channel is trusted or not.
- * The custom class can be specified via configuration.
- *
- */
-public class TrustedChannelResolver implements Configurable {
- Configuration conf;
-
- /**
- * Returns an instance of TrustedChannelResolver.
- * Looks up the configuration to see if there is custom class specified.
- * @param conf
- * @return TrustedChannelResolver
- */
- public static TrustedChannelResolver getInstance(Configuration conf) {
- Class<? extends TrustedChannelResolver> clazz =
- conf.getClass(
- DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS,
- TrustedChannelResolver.class, TrustedChannelResolver.class);
- return ReflectionUtils.newInstance(clazz, conf);
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Return boolean value indicating whether a channel is trusted or not
- * from a client's perspective.
- * @return true if the channel is trusted and false otherwise.
- */
- public boolean isTrusted() {
- return false;
- }
-
-
- /**
- * Identify boolean value indicating whether a channel is trusted or not.
- * @param peerAddress address of the peer
- * @return true if the channel is trusted and false otherwise.
- */
- public boolean isTrusted(InetAddress peerAddress) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
deleted file mode 100644
index 959cba0..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-
-/**
- * Creates a new {@link DataEncryptionKey} on demand.
- */
-@InterfaceAudience.Private
-public interface DataEncryptionKeyFactory {
-
- /**
- * Creates a new DataEncryptionKey.
- *
- * @return DataEncryptionKey newly created
- * @throws IOException for any error
- */
- DataEncryptionKey newDataEncryptionKey() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
deleted file mode 100644
index 852819f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.security.sasl.Sasl;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherOption;
-import org.apache.hadoop.crypto.CipherSuite;
-import org.apache.hadoop.crypto.CryptoCodec;
-import org.apache.hadoop.crypto.CryptoInputStream;
-import org.apache.hadoop.crypto.CryptoOutputStream;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.net.InetAddresses;
-import com.google.protobuf.ByteString;
-
-/**
- * Utility methods implementing SASL negotiation for DataTransferProtocol.
- */
-@InterfaceAudience.Private
-public final class DataTransferSaslUtil {
-
- private static final Logger LOG = LoggerFactory.getLogger(
- DataTransferSaslUtil.class);
-
- /**
- * Delimiter for the three-part SASL username string.
- */
- public static final String NAME_DELIMITER = " ";
-
- /**
- * Sent by clients and validated by servers. We use a number that's unlikely
- * to ever be sent as the value of the DATA_TRANSFER_VERSION.
- */
- public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
-
- /**
- * Checks that SASL negotiation has completed for the given participant, and
- * the negotiated quality of protection is included in the given SASL
- * properties and therefore acceptable.
- *
- * @param sasl participant to check
- * @param saslProps properties of SASL negotiation
- * @throws IOException for any error
- */
- public static void checkSaslComplete(SaslParticipant sasl,
- Map<String, String> saslProps) throws IOException {
- if (!sasl.isComplete()) {
- throw new IOException("Failed to complete SASL handshake");
- }
- Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
- saslProps.get(Sasl.QOP).split(",")));
- String negotiatedQop = sasl.getNegotiatedQop();
- LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}",
- requestedQop, negotiatedQop);
- if (!requestedQop.contains(negotiatedQop)) {
- throw new IOException(String.format("SASL handshake completed, but " +
- "channel does not have acceptable quality of protection, " +
- "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
- }
- }
-
- /**
- * Check whether requested SASL Qop contains privacy.
- *
- * @param saslProps properties of SASL negotiation
- * @return boolean true if privacy exists
- */
- public static boolean requestedQopContainsPrivacy(
- Map<String, String> saslProps) {
- Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
- saslProps.get(Sasl.QOP).split(",")));
- return requestedQop.contains("auth-conf");
- }
-
- /**
- * Creates SASL properties required for an encrypted SASL negotiation.
- *
- * @param encryptionAlgorithm to use for SASL negotation
- * @return properties of encrypted SASL negotiation
- */
- public static Map<String, String> createSaslPropertiesForEncryption(
- String encryptionAlgorithm) {
- Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
- saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
- saslProps.put(Sasl.SERVER_AUTH, "true");
- saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
- return saslProps;
- }
-
- /**
- * For an encrypted SASL negotiation, encodes an encryption key to a SASL
- * password.
- *
- * @param encryptionKey to encode
- * @return key encoded as SASL password
- */
- public static char[] encryptionKeyToPassword(byte[] encryptionKey) {
- return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8)
- .toCharArray();
- }
-
- /**
- * Returns InetAddress from peer. The getRemoteAddressString has the form
- * [host][/ip-address]:port. The host may be missing. The IP address (and
- * preceding '/') may be missing. The port preceded by ':' is always present.
- *
- * @param peer
- * @return InetAddress from peer
- */
- public static InetAddress getPeerAddress(Peer peer) {
- String remoteAddr = peer.getRemoteAddressString().split(":")[0];
- int slashIdx = remoteAddr.indexOf('/');
- return InetAddresses.forString(slashIdx != -1 ?
- remoteAddr.substring(slashIdx + 1, remoteAddr.length()) :
- remoteAddr);
- }
-
- /**
- * Creates a SaslPropertiesResolver from the given configuration. This method
- * works by cloning the configuration, translating configuration properties
- * specific to DataTransferProtocol to what SaslPropertiesResolver expects,
- * and then delegating to SaslPropertiesResolver for initialization. This
- * method returns null if SASL protection has not been configured for
- * DataTransferProtocol.
- *
- * @param conf configuration to read
- * @return SaslPropertiesResolver for DataTransferProtocol, or null if not
- * configured
- */
- public static SaslPropertiesResolver getSaslPropertiesResolver(
- Configuration conf) {
- String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
- if (qops == null || qops.isEmpty()) {
- LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " +
- "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY);
- return null;
- }
- Configuration saslPropsResolverConf = new Configuration(conf);
- saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops);
- Class<? extends SaslPropertiesResolver> resolverClass = conf.getClass(
- HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
- SaslPropertiesResolver.class, SaslPropertiesResolver.class);
- resolverClass = conf.getClass(DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY,
- resolverClass, SaslPropertiesResolver.class);
- saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
- resolverClass, SaslPropertiesResolver.class);
- SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance(
- saslPropsResolverConf);
- LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " +
- "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops,
- DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass);
- return resolver;
- }
-
- /**
- * Reads a SASL negotiation message.
- *
- * @param in stream to read
- * @return bytes of SASL negotiation messsage
- * @throws IOException for any error
- */
- public static byte[] readSaslMessage(InputStream in) throws IOException {
- DataTransferEncryptorMessageProto proto =
- DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
- if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
- throw new InvalidEncryptionKeyException(proto.getMessage());
- } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
- throw new IOException(proto.getMessage());
- } else {
- return proto.getPayload().toByteArray();
- }
- }
-
- /**
- * Reads a SASL negotiation message and negotiation cipher options.
- *
- * @param in stream to read
- * @param cipherOptions list to store negotiation cipher options
- * @return byte[] SASL negotiation message
- * @throws IOException for any error
- */
- public static byte[] readSaslMessageAndNegotiationCipherOptions(
- InputStream in, List<CipherOption> cipherOptions) throws IOException {
- DataTransferEncryptorMessageProto proto =
- DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
- if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
- throw new InvalidEncryptionKeyException(proto.getMessage());
- } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
- throw new IOException(proto.getMessage());
- } else {
- List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
- if (optionProtos != null) {
- for (CipherOptionProto optionProto : optionProtos) {
- cipherOptions.add(PBHelper.convert(optionProto));
- }
- }
- return proto.getPayload().toByteArray();
- }
- }
-
- /**
- * Negotiate a cipher option which server supports.
- *
- * @param conf the configuration
- * @param options the cipher options which client supports
- * @return CipherOption negotiated cipher option
- */
- public static CipherOption negotiateCipherOption(Configuration conf,
- List<CipherOption> options) throws IOException {
- // Negotiate cipher suites if configured. Currently, the only supported
- // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
- // values for future expansion.
- String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
- if (cipherSuites == null || cipherSuites.isEmpty()) {
- return null;
- }
- if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
- throw new IOException(String.format("Invalid cipher suite, %s=%s",
- DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
- }
- if (options != null) {
- for (CipherOption option : options) {
- CipherSuite suite = option.getCipherSuite();
- if (suite == CipherSuite.AES_CTR_NOPADDING) {
- int keyLen = conf.getInt(
- DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY,
- DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8;
- CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
- byte[] inKey = new byte[keyLen];
- byte[] inIv = new byte[suite.getAlgorithmBlockSize()];
- byte[] outKey = new byte[keyLen];
- byte[] outIv = new byte[suite.getAlgorithmBlockSize()];
- codec.generateSecureRandom(inKey);
- codec.generateSecureRandom(inIv);
- codec.generateSecureRandom(outKey);
- codec.generateSecureRandom(outIv);
- return new CipherOption(suite, inKey, inIv, outKey, outIv);
- }
- }
- }
- return null;
- }
-
- /**
- * Send SASL message and negotiated cipher option to client.
- *
- * @param out stream to receive message
- * @param payload to send
- * @param option negotiated cipher option
- * @throws IOException for any error
- */
- public static void sendSaslMessageAndNegotiatedCipherOption(
- OutputStream out, byte[] payload, CipherOption option)
- throws IOException {
- DataTransferEncryptorMessageProto.Builder builder =
- DataTransferEncryptorMessageProto.newBuilder();
-
- builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
- if (payload != null) {
- builder.setPayload(ByteString.copyFrom(payload));
- }
- if (option != null) {
- builder.addCipherOption(PBHelper.convert(option));
- }
-
- DataTransferEncryptorMessageProto proto = builder.build();
- proto.writeDelimitedTo(out);
- out.flush();
- }
-
- /**
- * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream}
- * and {@link org.apache.hadoop.crypto.CryptoOutputStream}
- *
- * @param conf the configuration
- * @param cipherOption negotiated cipher option
- * @param out underlying output stream
- * @param in underlying input stream
- * @param isServer is server side
- * @return IOStreamPair the stream pair
- * @throws IOException for any error
- */
- public static IOStreamPair createStreamPair(Configuration conf,
- CipherOption cipherOption, OutputStream out, InputStream in,
- boolean isServer) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
- "CryptoOutputStream.");
- }
- CryptoCodec codec = CryptoCodec.getInstance(conf,
- cipherOption.getCipherSuite());
- byte[] inKey = cipherOption.getInKey();
- byte[] inIv = cipherOption.getInIv();
- byte[] outKey = cipherOption.getOutKey();
- byte[] outIv = cipherOption.getOutIv();
- InputStream cIn = new CryptoInputStream(in, codec,
- isServer ? inKey : outKey, isServer ? inIv : outIv);
- OutputStream cOut = new CryptoOutputStream(out, codec,
- isServer ? outKey : inKey, isServer ? outIv : inIv);
- return new IOStreamPair(cIn, cOut);
- }
-
- /**
- * Sends a SASL negotiation message indicating an error.
- *
- * @param out stream to receive message
- * @param message to send
- * @throws IOException for any error
- */
- public static void sendGenericSaslErrorMessage(OutputStream out,
- String message) throws IOException {
- sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
- }
-
- /**
- * Sends a SASL negotiation message.
- *
- * @param out stream to receive message
- * @param payload to send
- * @throws IOException for any error
- */
- public static void sendSaslMessage(OutputStream out, byte[] payload)
- throws IOException {
- sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
- }
-
- /**
- * Send a SASL negotiation message and negotiation cipher options to server.
- *
- * @param out stream to receive message
- * @param payload to send
- * @param options cipher options to negotiate
- * @throws IOException for any error
- */
- public static void sendSaslMessageAndNegotiationCipherOptions(
- OutputStream out, byte[] payload, List<CipherOption> options)
- throws IOException {
- DataTransferEncryptorMessageProto.Builder builder =
- DataTransferEncryptorMessageProto.newBuilder();
-
- builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
- if (payload != null) {
- builder.setPayload(ByteString.copyFrom(payload));
- }
- if (options != null) {
- builder.addAllCipherOption(PBHelper.convertCipherOptions(options));
- }
-
- DataTransferEncryptorMessageProto proto = builder.build();
- proto.writeDelimitedTo(out);
- out.flush();
- }
-
- /**
- * Read SASL message and negotiated cipher option from server.
- *
- * @param in stream to read
- * @return SaslResponseWithNegotiatedCipherOption SASL message and
- * negotiated cipher option
- * @throws IOException for any error
- */
- public static SaslResponseWithNegotiatedCipherOption
- readSaslMessageAndNegotiatedCipherOption(InputStream in)
- throws IOException {
- DataTransferEncryptorMessageProto proto =
- DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
- if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
- throw new InvalidEncryptionKeyException(proto.getMessage());
- } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
- throw new IOException(proto.getMessage());
- } else {
- byte[] response = proto.getPayload().toByteArray();
- List<CipherOption> options = PBHelper.convertCipherOptionProtos(
- proto.getCipherOptionList());
- CipherOption option = null;
- if (options != null && !options.isEmpty()) {
- option = options.get(0);
- }
- return new SaslResponseWithNegotiatedCipherOption(response, option);
- }
- }
-
- /**
- * Encrypt the key and iv of the negotiated cipher option.
- *
- * @param option negotiated cipher option
- * @param sasl SASL participant representing server
- * @return CipherOption negotiated cipher option which contains the
- * encrypted key and iv
- * @throws IOException for any error
- */
- public static CipherOption wrap(CipherOption option, SaslParticipant sasl)
- throws IOException {
- if (option != null) {
- byte[] inKey = option.getInKey();
- if (inKey != null) {
- inKey = sasl.wrap(inKey, 0, inKey.length);
- }
- byte[] outKey = option.getOutKey();
- if (outKey != null) {
- outKey = sasl.wrap(outKey, 0, outKey.length);
- }
- return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
- outKey, option.getOutIv());
- }
-
- return null;
- }
-
- /**
- * Decrypt the key and iv of the negotiated cipher option.
- *
- * @param option negotiated cipher option
- * @param sasl SASL participant representing client
- * @return CipherOption negotiated cipher option which contains the
- * decrypted key and iv
- * @throws IOException for any error
- */
- public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
- throws IOException {
- if (option != null) {
- byte[] inKey = option.getInKey();
- if (inKey != null) {
- inKey = sasl.unwrap(inKey, 0, inKey.length);
- }
- byte[] outKey = option.getOutKey();
- if (outKey != null) {
- outKey = sasl.unwrap(outKey, 0, outKey.length);
- }
- return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
- outKey, option.getOutIv());
- }
-
- return null;
- }
-
- /**
- * Sends a SASL negotiation message.
- *
- * @param out stream to receive message
- * @param status negotiation status
- * @param payload to send
- * @param message to send
- * @throws IOException for any error
- */
- public static void sendSaslMessage(OutputStream out,
- DataTransferEncryptorStatus status, byte[] payload, String message)
- throws IOException {
- DataTransferEncryptorMessageProto.Builder builder =
- DataTransferEncryptorMessageProto.newBuilder();
-
- builder.setStatus(status);
- if (payload != null) {
- builder.setPayload(ByteString.copyFrom(payload));
- }
- if (message != null) {
- builder.setMessage(message);
- }
-
- DataTransferEncryptorMessageProto proto = builder.build();
- proto.writeDelimitedTo(out);
- out.flush();
- }
-
- /**
- * There is no reason to instantiate this class.
- */
- private DataTransferSaslUtil() {
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
deleted file mode 100644
index 00b131f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
-import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherOption;
-import org.apache.hadoop.crypto.CipherSuite;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-
-/**
- * Negotiates SASL for DataTransferProtocol on behalf of a client. There are
- * two possible supported variants of SASL negotiation: either a general-purpose
- * negotiation supporting any quality of protection, or a specialized
- * negotiation that enforces privacy as the quality of protection using a
- * cryptographically strong encryption key.
- *
- * This class is used in both the HDFS client and the DataNode. The DataNode
- * needs it, because it acts as a client to other DataNodes during write
- * pipelines and block transfers.
- */
-@InterfaceAudience.Private
-public class SaslDataTransferClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(
- SaslDataTransferClient.class);
-
- private final Configuration conf;
- private final AtomicBoolean fallbackToSimpleAuth;
- private final SaslPropertiesResolver saslPropsResolver;
- private final TrustedChannelResolver trustedChannelResolver;
-
- /**
- * Creates a new SaslDataTransferClient. This constructor is used in cases
- * where it is not relevant to track if a secure client did a fallback to
- * simple auth. For intra-cluster connections between data nodes in the same
- * cluster, we can assume that all run under the same security configuration.
- *
- * @param conf the configuration
- * @param saslPropsResolver for determining properties of SASL negotiation
- * @param trustedChannelResolver for identifying trusted connections that do
- * not require SASL negotiation
- */
- public SaslDataTransferClient(Configuration conf,
- SaslPropertiesResolver saslPropsResolver,
- TrustedChannelResolver trustedChannelResolver) {
- this(conf, saslPropsResolver, trustedChannelResolver, null);
- }
-
- /**
- * Creates a new SaslDataTransferClient.
- *
- * @param conf the configuration
- * @param saslPropsResolver for determining properties of SASL negotiation
- * @param trustedChannelResolver for identifying trusted connections that do
- * not require SASL negotiation
- * @param fallbackToSimpleAuth checked on each attempt at general SASL
- * handshake, if true forces use of simple auth
- */
- public SaslDataTransferClient(Configuration conf,
- SaslPropertiesResolver saslPropsResolver,
- TrustedChannelResolver trustedChannelResolver,
- AtomicBoolean fallbackToSimpleAuth) {
- this.conf = conf;
- this.fallbackToSimpleAuth = fallbackToSimpleAuth;
- this.saslPropsResolver = saslPropsResolver;
- this.trustedChannelResolver = trustedChannelResolver;
- }
-
- /**
- * Sends client SASL negotiation for a newly allocated socket if required.
- *
- * @param socket connection socket
- * @param underlyingOut connection output stream
- * @param underlyingIn connection input stream
- * @param encryptionKeyFactory for creation of an encryption key
- * @param accessToken connection block access token
- * @param datanodeId ID of destination DataNode
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
- InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
- Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
- throws IOException {
- // The encryption key factory only returns a key if encryption is enabled.
- DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
- encryptionKeyFactory.newDataEncryptionKey() : null;
- IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
- underlyingIn, encryptionKey, accessToken, datanodeId);
- return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
- }
-
- /**
- * Sends client SASL negotiation for a peer if required.
- *
- * @param peer connection peer
- * @param encryptionKeyFactory for creation of an encryption key
- * @param accessToken connection block access token
- * @param datanodeId ID of destination DataNode
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory,
- Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
- throws IOException {
- IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer),
- peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory,
- accessToken, datanodeId);
- // TODO: Consider renaming EncryptedPeer to SaslPeer.
- return ios != null ? new EncryptedPeer(peer, ios) : peer;
- }
-
- /**
- * Sends client SASL negotiation for a socket if required.
- *
- * @param socket connection socket
- * @param underlyingOut connection output stream
- * @param underlyingIn connection input stream
- * @param encryptionKeyFactory for creation of an encryption key
- * @param accessToken connection block access token
- * @param datanodeId ID of destination DataNode
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
- InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
- Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
- throws IOException {
- IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
- underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
- return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
- }
-
- /**
- * Checks if an address is already trusted and then sends client SASL
- * negotiation if required.
- *
- * @param addr connection address
- * @param underlyingOut connection output stream
- * @param underlyingIn connection input stream
- * @param encryptionKeyFactory for creation of an encryption key
- * @param accessToken connection block access token
- * @param datanodeId ID of destination DataNode
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- private IOStreamPair checkTrustAndSend(InetAddress addr,
- OutputStream underlyingOut, InputStream underlyingIn,
- DataEncryptionKeyFactory encryptionKeyFactory,
- Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
- throws IOException {
- if (!trustedChannelResolver.isTrusted() &&
- !trustedChannelResolver.isTrusted(addr)) {
- // The encryption key factory only returns a key if encryption is enabled.
- DataEncryptionKey encryptionKey =
- encryptionKeyFactory.newDataEncryptionKey();
- return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
- datanodeId);
- } else {
- LOG.debug(
- "SASL client skipping handshake on trusted connection for addr = {}, "
- + "datanodeId = {}", addr, datanodeId);
- return null;
- }
- }
-
- /**
- * Sends client SASL negotiation if required. Determines the correct type of
- * SASL handshake based on configuration.
- *
- * @param addr connection address
- * @param underlyingOut connection output stream
- * @param underlyingIn connection input stream
- * @param encryptionKey for an encrypted SASL handshake
- * @param accessToken connection block access token
- * @param datanodeId ID of destination DataNode
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
- InputStream underlyingIn, DataEncryptionKey encryptionKey,
- Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
- throws IOException {
- if (encryptionKey != null) {
- LOG.debug(
- "SASL client doing encrypted handshake for addr = {}, datanodeId = {}",
- addr, datanodeId);
- return getEncryptedStreams(underlyingOut, underlyingIn,
- encryptionKey);
- } else if (!UserGroupInformation.isSecurityEnabled()) {
- LOG.debug(
- "SASL client skipping handshake in unsecured configuration for "
- + "addr = {}, datanodeId = {}", addr, datanodeId);
- return null;
- } else if (SecurityUtil.isPrivilegedPort(datanodeId.getXferPort())) {
- LOG.debug(
- "SASL client skipping handshake in secured configuration with "
- + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
- return null;
- } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
- LOG.debug(
- "SASL client skipping handshake in secured configuration with "
- + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
- return null;
- } else if (saslPropsResolver != null) {
- LOG.debug(
- "SASL client doing general handshake for addr = {}, datanodeId = {}",
- addr, datanodeId);
- return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
- datanodeId);
- } else {
- // It's a secured cluster using non-privileged ports, but no SASL. The
- // only way this can happen is if the DataNode has
- // ignore.secure.ports.for.testing configured, so this is a rare edge case.
- LOG.debug(
- "SASL client skipping handshake in secured configuration with no SASL "
- + "protection configured for addr = {}, datanodeId = {}",
- addr, datanodeId);
- return null;
- }
- }
-
- /**
- * Sends client SASL negotiation for specialized encrypted handshake.
- *
- * @param underlyingOut connection output stream
- * @param underlyingIn connection input stream
- * @param encryptionKey for an encrypted SASL handshake
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- private IOStreamPair getEncryptedStreams(OutputStream underlyingOut,
- InputStream underlyingIn, DataEncryptionKey encryptionKey)
- throws IOException {
- Map<String, String> saslProps = createSaslPropertiesForEncryption(
- encryptionKey.encryptionAlgorithm);
-
- LOG.debug("Client using encryption algorithm {}",
- encryptionKey.encryptionAlgorithm);
-
- String userName = getUserNameFromEncryptionKey(encryptionKey);
- char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey);
- CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
- password);
- return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
- callbackHandler);
- }
-
- /**
- * The SASL username for an encrypted handshake consists of the keyId,
- * blockPoolId, and nonce with the first two encoded as Strings, and the third
- * encoded using Base64. The fields are each separated by a single space.
- *
- * @param encryptionKey the encryption key to encode as a SASL username.
- * @return encoded username containing keyId, blockPoolId, and nonce
- */
- private static String getUserNameFromEncryptionKey(
- DataEncryptionKey encryptionKey) {
- return encryptionKey.keyId + NAME_DELIMITER +
- encryptionKey.blockPoolId + NAME_DELIMITER +
- new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
- }
-
- /**
- * Sets user name and password when asked by the client-side SASL object.
- */
- private static final class SaslClientCallbackHandler
- implements CallbackHandler {
-
- private final char[] password;
- private final String userName;
-
- /**
- * Creates a new SaslClientCallbackHandler.
- *
- * @param userName SASL user name
- * @Param password SASL password
- */
- public SaslClientCallbackHandler(String userName, char[] password) {
- this.password = password;
- this.userName = userName;
- }
-
- @Override
- public void handle(Callback[] callbacks) throws IOException,
- UnsupportedCallbackException {
- NameCallback nc = null;
- PasswordCallback pc = null;
- RealmCallback rc = null;
- for (Callback callback : callbacks) {
- if (callback instanceof RealmChoiceCallback) {
- continue;
- } else if (callback instanceof NameCallback) {
- nc = (NameCallback) callback;
- } else if (callback instanceof PasswordCallback) {
- pc = (PasswordCallback) callback;
- } else if (callback instanceof RealmCallback) {
- rc = (RealmCallback) callback;
- } else {
- throw new UnsupportedCallbackException(callback,
- "Unrecognized SASL client callback");
- }
- }
- if (nc != null) {
- nc.setName(userName);
- }
- if (pc != null) {
- pc.setPassword(password);
- }
- if (rc != null) {
- rc.setText(rc.getDefaultText());
- }
- }
- }
-
- /**
- * Sends client SASL negotiation for general-purpose handshake.
- *
- * @param addr connection address
- * @param underlyingOut connection output stream
- * @param underlyingIn connection input stream
- * @param accessToken connection block access token
- * @param datanodeId ID of destination DataNode
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- private IOStreamPair getSaslStreams(InetAddress addr,
- OutputStream underlyingOut, InputStream underlyingIn,
- Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
- throws IOException {
- Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
-
- String userName = buildUserName(accessToken);
- char[] password = buildClientPassword(accessToken);
- CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
- password);
- return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
- callbackHandler);
- }
-
- /**
- * Builds the client's user name for the general-purpose handshake, consisting
- * of the base64-encoded serialized block access token identifier. Note that
- * this includes only the token identifier, not the token itself, which would
- * include the password. The password is a shared secret, and we must not
- * write it on the network during the SASL authentication exchange.
- *
- * @param blockToken for block access
- * @return SASL user name
- */
- private static String buildUserName(Token<BlockTokenIdentifier> blockToken) {
- return new String(Base64.encodeBase64(blockToken.getIdentifier(), false),
- Charsets.UTF_8);
- }
-
- /**
- * Calculates the password on the client side for the general-purpose
- * handshake. The password consists of the block access token's password.
- *
- * @param blockToken for block access
- * @return SASL password
- */
- private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
- return new String(Base64.encodeBase64(blockToken.getPassword(), false),
- Charsets.UTF_8).toCharArray();
- }
-
- /**
- * This method actually executes the client-side SASL handshake.
- *
- * @param underlyingOut connection output stream
- * @param underlyingIn connection input stream
- * @param userName SASL user name
- * @param saslProps properties of SASL negotiation
- * @param callbackHandler for responding to SASL callbacks
- * @return new pair of streams, wrapped after SASL negotiation
- * @throws IOException for any error
- */
- private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
- InputStream underlyingIn, String userName, Map<String, String> saslProps,
- CallbackHandler callbackHandler) throws IOException {
-
- DataOutputStream out = new DataOutputStream(underlyingOut);
- DataInputStream in = new DataInputStream(underlyingIn);
-
- SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName,
- saslProps, callbackHandler);
-
- out.writeInt(SASL_TRANSFER_MAGIC_NUMBER);
- out.flush();
-
- try {
- // Start of handshake - "initial response" in SASL terminology.
- sendSaslMessage(out, new byte[0]);
-
- // step 1
- byte[] remoteResponse = readSaslMessage(in);
- byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
- List<CipherOption> cipherOptions = null;
- if (requestedQopContainsPrivacy(saslProps)) {
- // Negotiate cipher suites if configured. Currently, the only supported
- // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
- // values for future expansion.
- String cipherSuites = conf.get(
- DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
- if (cipherSuites != null && !cipherSuites.isEmpty()) {
- if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
- throw new IOException(String.format("Invalid cipher suite, %s=%s",
- DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
- }
- CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
- cipherOptions = Lists.newArrayListWithCapacity(1);
- cipherOptions.add(option);
- }
- }
- sendSaslMessageAndNegotiationCipherOptions(out, localResponse,
- cipherOptions);
-
- // step 2 (client-side only)
- SaslResponseWithNegotiatedCipherOption response =
- readSaslMessageAndNegotiatedCipherOption(in);
- localResponse = sasl.evaluateChallengeOrResponse(response.payload);
- assert localResponse == null;
-
- // SASL handshake is complete
- checkSaslComplete(sasl, saslProps);
-
- CipherOption cipherOption = null;
- if (sasl.isNegotiatedQopPrivacy()) {
- // Unwrap the negotiated cipher option
- cipherOption = unwrap(response.cipherOption, sasl);
- }
-
- // If negotiated cipher option is not null, we will use it to create
- // stream pair.
- return cipherOption != null ? createStreamPair(
- conf, cipherOption, underlyingOut, underlyingIn, false) :
- sasl.createStreamPair(out, in);
- } catch (IOException ioe) {
- sendGenericSaslErrorMessage(out, ioe.getMessage());
- throw ioe;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index f060beb..95965b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
import java.io.ByteArrayInputStream;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
deleted file mode 100644
index f14a075..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.Map;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.security.SaslInputStream;
-import org.apache.hadoop.security.SaslOutputStream;
-
-/**
- * Strongly inspired by Thrift's TSaslTransport class.
- *
- * Used to abstract over the <code>SaslServer</code> and
- * <code>SaslClient</code> classes, which share a lot of their interface, but
- * unfortunately don't share a common superclass.
- */
-@InterfaceAudience.Private
-class SaslParticipant {
-
- // This has to be set as part of the SASL spec, but it don't matter for
- // our purposes, but may not be empty. It's sent over the wire, so use
- // a short string.
- private static final String SERVER_NAME = "0";
- private static final String PROTOCOL = "hdfs";
- private static final String MECHANISM = "DIGEST-MD5";
-
- // One of these will always be null.
- private final SaslServer saslServer;
- private final SaslClient saslClient;
-
- /**
- * Creates a SaslParticipant wrapping a SaslServer.
- *
- * @param saslProps properties of SASL negotiation
- * @param callbackHandler for handling all SASL callbacks
- * @return SaslParticipant wrapping SaslServer
- * @throws SaslException for any error
- */
- public static SaslParticipant createServerSaslParticipant(
- Map<String, String> saslProps, CallbackHandler callbackHandler)
- throws SaslException {
- return new SaslParticipant(Sasl.createSaslServer(MECHANISM,
- PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
- }
-
- /**
- * Creates a SaslParticipant wrapping a SaslClient.
- *
- * @param userName SASL user name
- * @param saslProps properties of SASL negotiation
- * @param callbackHandler for handling all SASL callbacks
- * @return SaslParticipant wrapping SaslClient
- * @throws SaslException for any error
- */
- public static SaslParticipant createClientSaslParticipant(String userName,
- Map<String, String> saslProps, CallbackHandler callbackHandler)
- throws SaslException {
- return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM },
- userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
- }
-
- /**
- * Private constructor wrapping a SaslServer.
- *
- * @param saslServer to wrap
- */
- private SaslParticipant(SaslServer saslServer) {
- this.saslServer = saslServer;
- this.saslClient = null;
- }
-
- /**
- * Private constructor wrapping a SaslClient.
- *
- * @param saslClient to wrap
- */
- private SaslParticipant(SaslClient saslClient) {
- this.saslServer = null;
- this.saslClient = saslClient;
- }
-
- /**
- * @see {@link SaslServer#evaluateResponse}
- * @see {@link SaslClient#evaluateChallenge}
- */
- public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse)
- throws SaslException {
- if (saslClient != null) {
- return saslClient.evaluateChallenge(challengeOrResponse);
- } else {
- return saslServer.evaluateResponse(challengeOrResponse);
- }
- }
-
- /**
- * After successful SASL negotation, returns the negotiated quality of
- * protection.
- *
- * @return negotiated quality of protection
- */
- public String getNegotiatedQop() {
- if (saslClient != null) {
- return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
- } else {
- return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- }
- }
-
- /**
- * After successful SASL negotiation, returns whether it's QOP privacy
- *
- * @return boolean whether it's QOP privacy
- */
- public boolean isNegotiatedQopPrivacy() {
- String qop = getNegotiatedQop();
- return qop != null && "auth-conf".equalsIgnoreCase(qop);
- }
-
- /**
- * Wraps a byte array.
- *
- * @param bytes The array containing the bytes to wrap.
- * @param off The starting position at the array
- * @param len The number of bytes to wrap
- * @return byte[] wrapped bytes
- * @throws SaslException if the bytes cannot be successfully wrapped
- */
- public byte[] wrap(byte[] bytes, int off, int len) throws SaslException {
- if (saslClient != null) {
- return saslClient.wrap(bytes, off, len);
- } else {
- return saslServer.wrap(bytes, off, len);
- }
- }
-
- /**
- * Unwraps a byte array.
- *
- * @param bytes The array containing the bytes to unwrap.
- * @param off The starting position at the array
- * @param len The number of bytes to unwrap
- * @return byte[] unwrapped bytes
- * @throws SaslException if the bytes cannot be successfully unwrapped
- */
- public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException {
- if (saslClient != null) {
- return saslClient.unwrap(bytes, off, len);
- } else {
- return saslServer.unwrap(bytes, off, len);
- }
- }
-
- /**
- * Returns true if SASL negotiation is complete.
- *
- * @return true if SASL negotiation is complete
- */
- public boolean isComplete() {
- if (saslClient != null) {
- return saslClient.isComplete();
- } else {
- return saslServer.isComplete();
- }
- }
-
- /**
- * Return some input/output streams that may henceforth have their
- * communication encrypted, depending on the negotiated quality of protection.
- *
- * @param out output stream to wrap
- * @param in input stream to wrap
- * @return IOStreamPair wrapping the streams
- */
- public IOStreamPair createStreamPair(DataOutputStream out,
- DataInputStream in) {
- if (saslClient != null) {
- return new IOStreamPair(
- new SaslInputStream(in, saslClient),
- new SaslOutputStream(out, saslClient));
- } else {
- return new IOStreamPair(
- new SaslInputStream(in, saslServer),
- new SaslOutputStream(out, saslServer));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
deleted file mode 100644
index f69441b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.crypto.CipherOption;
-
-@InterfaceAudience.Private
-public class SaslResponseWithNegotiatedCipherOption {
- final byte[] payload;
- final CipherOption cipherOption;
-
- public SaslResponseWithNegotiatedCipherOption(byte[] payload,
- CipherOption cipherOption) {
- this.payload = payload;
- this.cipherOption = cipherOption;
- }
-}
\ No newline at end of file